This tutorial shows you how to create a structured concurrency in Java using StructuredTaskScope
, including how to use ShutDownOnFailure
, ShutDownOnSuccess
, or custom policy.
Java allows you to execute code concurrently by creating threads. Java 19 brought an improvement by introducing a feature called virtual threads, which is lighter and easier to use than the traditional threads. Therefore, you can create virtual threads to run tasks concurrently. However, if you have multiple tasks executed in different threads, one of the problems that commonly arises is how to control when to cancel the execution of tasks. There are some different policies for it. For example, when a task fails, the other tasks should not be continued since the result will be useless. In another policy, we only want to get the result of a task that completes first. Handling such cases can be quite complex.
Fortunately, Java 21 introduced a feature called structured concurrency. It adds a class named StructuredTaskScope
, which is designed to streamline error handling and cancellation. Besides reducing the effort for writing the code, it can also improve the readability of the code.
Using StructuredTaskScope
To create a structured concurrency using StructuredTaskScope
, you have to create an instance of it. The StructuredTaskScope
class implements AutoCloseable
. Therefore, you can use a try-with-resources block.
Inside the block, you need to create some subtasks to be executed using the fork(Callable)
method. Then, call the join()
or joinUntil(Instant)
method of the scope, which makes it wait until a certain condition meets. For example, you can wait until a subtask has completed or until any subtask fails, depending on the shutdown policy used.
StructuredTaskScope
States
A StructuredTaskScope
has three states: OPEN
, SHUTDOWN
, and CLOSED
. The order of the states are OPEN
-> SHUTDOWN
-> CLOSED
. It's not possible for the state to go backward.
The initial state is OPEN
. In this state, it's possible to fork new subtasks while waiting for existing subtasks to complete.
The second state is SHUTDOWN
. Any thread can set the scope's state to SHUTDOWN
, not necessarily the scope owner. In this state, all subtasks have been completed (either success or failed) or interrupted. If you try to fork a new subtask, it won't be executed. Updating to this state can be done by using the shutdown()
method. Usually, it's already handled by the classes that extend StructuredTaskScope
, such as ShutDownOnFailure
and ShutDownOnSuccess
, when a certain condition meets. However, you can also call it manually if necessary. The isShutdown()
method can be used to check whether the scope is already shut down.
The last state is CLOSED
. Only the scope owner can update to this state. In this state, it's not allowed to fork a new subtask or call the join()
or joinUntil(Instant)
method. If you do one of them, an IllegalStateException
exception will be thrown. Updating the state to CLOSED
is handled inside the close()
method. You don't need to manually call the method if you use a try-with-resources block.
Create Subtasks
To create a subtask to be run concurrently, you can use the fork(Callable)
method of StructuredTaskScope
. You have to pass a Callable
to be run as the argument.
public <U extends T> Subtask<U> fork(Callable<? extends U> task)
The method starts a new virtual thread to execute a value-returning method that's passed as the argument. It creates the new thread using the ThreadFactory
of the scope. The current ScopedValue
bindings are inherited to the new thread as well.
The return type of the fork
method is Subtask
, which represents a forked subtask. Using the returned Subtask
object, you can get the current state of the subtask.
Get Subtask State
First, you can get the state of the subtask by calling the state()
method which returns an enum whose values are:
UNAVAILABLE
: The subtask has been forked but not completed or forked/completed after the scope was shut down. The result or exception is not available.SUCCESS
: The subtask completed successfully with a result. The result can be obtained by using theget()
method.FAILED
: The subtask failed with an exception. The exception can be obtained by using theexception()
method.
Example:
State state = mySubtask.state();
Call join()
Method
The join()
method is used to wait for all subtasks of the scope to finish or the task scope to shut down. The call to the method is required if you want to get the result or exception of the scope.
public StructuredTaskScope<T> join() throws InterruptedException
There are several rules regarding how to call the join()
method.
- It can only be invoked by the scope owner. If you call it from a forked thread for example, it will throw
java.lang.WrongThreadException: Current thread not owner
. - It can only be invoked if the state of the scope is not
CLOSED
. If the scope is already closed and you call thejoin()
method, you'll getjava.lang.IllegalStateException: Task scope is closed
exception. - It may throw
InterruptedException
if the thread is interrupted while waiting.
List<Double> getResult() throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<Double> subtask1 = scope.fork(Math::random);
Subtask<Double> subtask2 = scope.fork(Math::random);
scope.join();
// Get result or exception
return List.of(subtask1.get(), subtask2.get());
}
}
There is also an alternative method called joinUntil(Instant)
which does the similar thing, but with a deadline. It can also throw a TimeoutException
if the waiting time exceeds the deadline.
public StructuredTaskScope<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException
Get Subtask Result
If a subtask has completed successfully, you can get the returned value by using the get()
method.
Subtask<Double> mySubtask = scope.fork(Math::random);
// Other subtasks
scope.join();
Double subtask1Result = mySubtask.get();
The method may throw an IllegalStateException
in these conditions:
- If the owner did not join after forking subtask
- If the subtask has not completed (status:
UNAVAILABLE
). - If the subtask completed unsuccessfully (status:
FAILED
).
Therefore, you have to make sure to only call the get()
only after the join()
or joinUntil(Instant)
method of the scope has been called and the subtask status is SUCCESS
.
Get Subtask Exception
A subtask can also fail. If that happens, you may want to get the exception in order to check the cause. It can be done by calling the exception()
method.
Subtask<Double> mySubtask = scope.fork(() -> {
throw new RuntimeException();
});
// Other subtasks
scope.join();
Throwable subtask1Exception = mySubtask.exception();
The method may throw an IllegalStateException
in these conditions:
- If the owner did not join after forking subtask
- If the subtask has not completed (status:
UNAVAILABLE
). - If the subtask completed with a result (status:
SUCCESS
).
That means you should only call the exception()
after calling the join()
or joinUntil(Instant)
method of the scope and if the subtask completed with an exception.
Shut Down Policies
If there are several subtasks running concurrently, sometimes it would be preferred to cancel unfinished subtasks on a certain condition. For example, if one of the subtasks fails, the others should be canceled because the results will be useless. However, in another case, we only want to get the result from any subtask that completes successfully first. Java already provides two implementations of StructuredTaskScope
, ShutdownOnFailure
and ShutdownOnSuccess
to handle those two different policies.
ShutdownOnFailure
Policy
With this policy, the scope's shutdown()
method will be called when any subtask throws an exception. It causes the unfinished threads to be interrupted and awakens the task scope owner. This policy is suitable for cases where you need all subtasks to complete successfully because the results will be useless if any of them fails.
The ShutdownOnFailure
class also provides some additional methods. There is a method called exception
, which is used to return the exception of the first subtask that failed. If there is no failed subtask, it will return an empty Optional
.
public Optional<Throwable> exception()
It also has methods named throwIfFailed
for throwing an exception when any subtask fails.
public void throwIfFailed() throws ExecutionException
public <X extends Throwable> void throwIfFailed(Function<Throwable, ? extends X> esf) throws X
There are several rules for calling the exception
and throwIfFailed
methods.
- The methods can only be called by the scope owner. Otherwise, it will throw a
WrongThreadException
. - The methods can only be called after the
join()
orjoinUntil(Instant)
method has been called. Otherwise, it will throw anIllegalStateException
. - The parameter-less
throwIfFailed
method will throw anExecutionException
if there is a failed subtask with the exception of the first failed subtask as thecause
. Meanwhile, thethrowIfFailed
method that has a parameter allows you to throw any exception by passing a function.
In the example below, there is a structured concurrency using the ShutdownOnFailure
policy with two subtasks.
public class ShutDownOnFailureExample {
record HighScore(String user, Integer score) {}
record Result(Integer userScore, List<HighScore> highScores) {}
private Integer getUserScore() throws InterruptedException {
int delay = 100;
System.out.println("getUserScore will be delayed for " + delay + " ms");
Thread.sleep(delay);
return 50;
}
private List<HighScore> getHighScores() throws InterruptedException {
int delay = 50;
System.out.println("getHighScores will be delayed for " + delay + " ms");
Thread.sleep(delay);
return List.of(
new HighScore("Player A", 100),
new HighScore("Player B", 90),
new HighScore("Player C", 80)
);
}
Result getData() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<Integer> userScoreSubtask = scope.fork(this::getUserScore);
StructuredTaskScope.Subtask<List<HighScore>> highScoresSubtask = scope.fork(this::getHighScores);
scope.join()
.throwIfFailed();
return new Result(userScoreSubtask.get(), highScoresSubtask.get());
}
}
private void runExample() throws ExecutionException, InterruptedException, TimeoutException {
Result result = getData();
System.out.println("User score: " + result.userScore);
System.out.println("High scores: " + result.highScores);
}
public static void main(String[] args) {
ShutDownOnFailureExample example = new ShutDownOnFailureExample();
try {
example.runExample();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Output:
getHighScores will be delayed for 50 ms
getUserScore will be delayed for 100 ms
User score: 50
High scores: [HighScore[user=Player A, score=100], HighScore[user=Player B, score=90], HighScore[user=Player C, score=80]]
If there is no exception thrown, the call to throwIfFailed()
will not throw any exception as well. The result of each subtask can be obtained by using the get()
method.
Now we change one of the subtasks to throw an exception.
private Integer getUserScore() throws InterruptedException {
int delay = 100;
System.out.println("getUserScore will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getUserScore error");
}
Output:
getUserScore will be delayed for 100 ms
getHighScores will be delayed for 50 ms
java.lang.RuntimeException: getUserScore error
As a result, an exception is thrown because of the call to the throwIfFailed
method. In case the throwIfFailed
method is not called, it won't throw the exception. However, if you try to get the result of a subtask whose state is not SUCCESS
, you'll get an IllegalStateException
.
Another alternative is to use the throwIfFailed(Function<Throwable, ? extends X> esf)
method, which has one parameter to pass a function which can return any exception that you want. When there is a failed subtask, the function will be invoked with the first exception thrown as the argument.
scope.join()
.throwIfFailed((ex) -> {
throw new RuntimeException("Wrapped exception", ex);
});
If you want to get the exception without rethrowing it or mapping it to another exception, you can call the exception()
method.
Optional<Throwable> exception = scope.join()
.exception();
exception.ifPresent(Throwable::printStackTrace);
Below is another case where both subtasks throw an exception.
private Integer getUserScore() throws InterruptedException {
int delay = 100;
System.out.println("getUserScore will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getUserScore error");
}
private List<HighScore> getHighScores() throws InterruptedException {
int delay = 50;
System.out.println("getHighScores will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getHighScores error");
}
Output:
getHighScores will be delayed for 50 ms
getUserScore will be delayed for 100 ms
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: getHighScores error
Because highScoresSubtask
throws the exception before userScoreSubtask
, the exception of highScoresSubtask
is the one that will be thrown when calling throwIfFailed
. The same also applies if you call the exception
method. In fact, most likely the userScoreSubtask
is already interrupted before throwing an exception.
ShutdownOnSuccess
Policy
This policy calls the shutdown()
method when any of the subtasks has completed with a result. As it happens, the unfinished threads are interrupted and the task scope owner is awakened. You should use this policy for cases where you only need to get the result from any subtask that completes the quickest.
The ShutdownOnSuccess
class provides several methods called result
for returning the result of the first subtask.
public T result() throws ExecutionException
public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X
There are several rules for calling the result
methods.
- The methods can only be called by the scope owner. Otherwise, it will throw a
WrongThreadException
. - The methods can only be called after the
join()
orjoinUntil(Instant)
method has been called. Otherwise, it will throw anIllegalStateException
. - If there's no subtask completed, the methods will throw an
IllegalStateException
. - The one without parameter will throw an
ExecutionException
if no subtasks completed but at least one task failed. Meanwhile, the one with a parameter allows you to throw any exception by passing a function.
The ShutdownOnSuccess
class has a parameterized type which is used to specify the type of the value returned by the subtasks, since it's quite common for the subtasks to have the same return type. If you don't specify the type, the return type of the result
methods will be an Object
.
In the example below, we have two subtasks for getting an opponent. We only want to get one result from the quickest one.
public class ShutDownOnSuccessExample {
record Player(String id, String name) {}
private Player getOpponent1Subtask() throws InterruptedException {
int delay = 100;
System.out.println("getOpponent1 will be delayed for " + delay + " ms");
Thread.sleep(delay);
System.out.println("getOpponent1Subtask - done");
return new Player("A", "Player A");
}
private Player getOpponent2() throws InterruptedException {
int delay = 50;
System.out.println("getOpponent2 will be delayed for " + delay + " ms");
Thread.sleep(delay);
System.out.println("getOpponent2 - done");
return new Player("B", "Player B");
}
Player getOpponent() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Player>()) {
StructuredTaskScope.Subtask<Player> getOpponent1Subtask = scope.fork(this::getOpponent1Subtask);
StructuredTaskScope.Subtask<Player> getOpponent2Subtask = scope.fork(this::getOpponent2);
System.out.println("-----Before join-----");
System.out.println("getOpponent1Subtask - state: " + getOpponent1Subtask.state());
System.out.println("getOpponent2Subtask - state: " + getOpponent2Subtask.state());
Player opponent = scope.join()
.result();
System.out.println(opponent);
System.out.println("-----After join-----");
System.out.println("getOpponent1Subtask - state: " + getOpponent1Subtask.state());
System.out.println("getOpponent2Subtask - state: " + getOpponent2Subtask.state());
return opponent;
}
}
private void runExample() throws ExecutionException, InterruptedException {
Player opponent = getOpponent();
}
public static void main(String[] args) {
ShutDownOnSuccessExample example = new ShutDownOnSuccessExample();
try {
example.runExample();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Output:
-----Before join-----
getOpponent1Subtask - state: UNAVAILABLE
getOpponent2Subtask - state: UNAVAILABLE
getOpponent2 will be delayed for 50 ms
getOpponent1 will be delayed for 100 ms
getOpponent2 - done
Player[id=B, name=Player B]
-----After join-----
getOpponent1Subtask - state: UNAVAILABLE
getOpponent2Subtask - state: SUCCESS
From the output, we can see that the initial state of both subtasks are UNAVAILABLE
. Then, it turns out that the getOpponent2Subtask
completed faster since it has a shorter delay time. As a result, the result returned by the result
method is the one from the getOpponent2Subtask
. The getOpponent1Subtask
is interrupted and hence it doesn't complete which can be seen that the 'getOpponent1Subtask - done' text is not printed. After scope is shut down, the state of the getOpponent2Subtask
becomes SUCCESS
, while the state of the getOpponent1Subtask
remains UNAVAILABLE
.
Now, we want to see what will happen if one of the subtasks fails. For example, we change the getOpponent2
method to throw an exception.
private Player getOpponent1Subtask() throws InterruptedException {
int delay = 100;
System.out.println("getOpponent1Subtask will be delayed for " + delay + " ms");
Thread.sleep(delay);
System.out.println("getOpponent1Subtask - done");
return new Player("A", "Player A");
}
private Player getOpponent2() throws InterruptedException {
int delay = 50;
System.out.println("getOpponent2 will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getOpponent2 error");
}
Output:
-----Before join-----
getOpponent1Subtask - state: UNAVAILABLE
getOpponent2Subtask - state: UNAVAILABLE
getOpponent2 will be delayed for 50 ms
getOpponent1 will be delayed for 100 ms
getOpponent1Subtask - done
Player[id=A, name=Player A]
-----After join-----
getOpponent1Subtask - state: SUCCESS
getOpponent2Subtask - state: FAILED
If there is a failed subtask, the others will not be interrupted. In addition, the result
method is still able to return a value as long as there is a subtask that executed successfully.
Now, let's see what's going to happen if both subtasks are failed.
private Player getOpponent1Subtask() throws InterruptedException {
int delay = 100;
System.out.println("getOpponent1Subtask will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getOpponent1Subtask error");
}
private Player getOpponent2() throws InterruptedException {
int delay = 50;
System.out.println("getOpponent2 will be delayed for " + delay + " ms");
Thread.sleep(delay);
throw new RuntimeException("getOpponent2 error");
}
Output:
-----Before join-----
getOpponent1Subtask - state: UNAVAILABLE
getOpponent2Subtask - state: UNAVAILABLE
getOpponent2 will be delayed for 50 ms
getOpponent1 will be delayed for 100 ms
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: getOpponent2 error
at com.woolha.example.structuredconcurrency.ShutDownOnSuccessExample.main(ShutDownOnSuccessExample.java:60)
Because there is no result available, the call to result
causes an exception. Java will throw the exception that occurred first. In the example above, the exception from getOpponent2Subtask
is the one that's thrown because it's thrown before the exception from getOpponent1Subtask
.
Alternatively, you can change the code to use the result(Function<Throwable, ? extends X> esf)
method, which accepts a function that allows you to throw any exception that you want. If no subtask is successful and there is any subtask throwing an exception, the passed function will be invoked with an argument whose value is the first exception thrown by the subtasks.
Player opponent = scope.join()
.result((ex) -> { // ex is the first exception thrown
throw new RuntimeException("Wrapper exception", ex);
});
Custom Policy
It's also possible to create a custom policy that use your own logic. You can create a class that extends StructuredTaskScope
. Most likely, you need to override the handleComplete
method. It's the method to be invoked when each sub task completed, either success or failed. Inside, you can write any code to determine when to call the shutdown
method. You may also need to override the join()
and joinUntil(Instant deadline)
methods to return an object whose type is the custom class. In addition, you can also add additional methods.
The example below is a custom policy that calls the shutdown
method after all subtasks have been completed. It also stores the result of successful subtasks in a List
.
static class MyCustomScope <T> extends StructuredTaskScope<T> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
private final int numberOfTasks;
private int completedCount = 0;
MyCustomScope(int numberOfTasks) {
super(null, Thread.ofVirtual().factory());
this.numberOfTasks = numberOfTasks;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
System.out.println("handleComplete");
System.out.println(subtask.state());
if (subtask.state() == Subtask.State.SUCCESS) {
this.results.add(subtask.get());
}
this.completedCount++;
if (this.completedCount == this.numberOfTasks) {
System.out.println("xxx1");
super.shutdown();
} else {
System.out.println("xxx2");
}
}
@Override
public MyCustomScope<T> join() throws InterruptedException {
super.join();
return this;
}
@Override
public MyCustomScope<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException {
super.joinUntil(deadline);
return this;
}
public List<T> results() {
super.ensureOwnerAndJoined();
return new ArrayList<>(results);
}
}
Summary
In this tutorial, we have learned how to create structured concurrency in Java using StructuredTaskScope
. You can control when the threads can be canceled by selecting the shutdown policy to be used. There are some built-in policies which include ShutdownOnSuccess
and ShutdownOnFailure
. You can also create a custom policy if necessary.
If this feature is still being a preview feature in the Java version that you use, you have to add --enable-preview
to run the code.
You can also read about:
- How to use scoped values in Java, in case you need to share data across the subtask threads.