Java 8 is here, it’s time to learn something new. Java 7 and Java 6 are just slightly modified versions, but Java 8 will have major improvements. Maybe Java 8 is too big? Today I will give you a thorough explanation of the new abstraction CompletableFuture in JDK 8. As we all know, Java 8 will be released in less than a year, so this article is based on JDK 8 build 88 with lambda support. CompletableFuture extends Future provides methods, unary operators and promotes asynchronicity and an event-driven programming model that doesn't stop with older versions of Java. If you open the JavaDoc of CompletableFuture you will be shocked. There are about fifty methods (!), and some of them are very interesting and difficult to understand, for example:
Copy the code as follows: public <U,V> CompletableFuture<V> thenCombineAsync(
CompletableFuture<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor)
Don’t worry, keep reading. CompletableFuture collects all characteristics of ListenableFuture in Guava and SettableFuture. Additionally, built-in lambda expressions bring it closer to Scala/Akka futures. This may sound too good to be true, but read on. CompletableFuture has two main aspects that are superior to Future's asynchronous callback/conversion in ol, which allows the value of CompletableFuture to be set from any thread at any time.
1. Extract and modify the value of the package
Often futures represent code running in other threads, but this is not always the case. Sometimes you want to create a Future to indicate that you know what will happen, such as a JMS message arrival. So you have a Future but no potential asynchronous work in the future. You just want to simply be done (resolved) when a future JMS message arrives, which is driven by an event. In this case, you can simply create a CompletableFuture to return to your client, and simply complete() will unlock all clients waiting for the Future as long as you think your result is available.
First you can simply create a new CompletableFuture and give it to your client:
Copy the code as follows: public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
//...
return future;
}
Note that this future has no connection with Callable, there is no thread pool, and it does not work asynchronously. If client code now calls ask().get() it will block forever. If the registers complete the callback, they will never take effect. So what's the key? Now you can say:
Copy the code as follows: future.complete("42")
...At this moment, all client Future.get() will get the result of the string, and it will take effect immediately after completing the callback. This is very convenient when you want to represent the task of a Future, and there is no need to calculate the task of some execution thread. CompletableFuture.complete() can only be called once, subsequent calls will be ignored. But there is also a backdoor called CompletableFuture.obtrudeValue(...) that overwrites the previous value of a new Future, so please use it with caution.
Sometimes you want to see what happens when a signal fails, as you know a Future object can handle the result or exception it contains. If you want to pass some exceptions further, you can use CompletableFuture.completeExceptionally(ex) (or use a more powerful method like obtrudeException(ex) to override the previous exception). completeExceptionally() also unlocks all waiting clients, but this time throws an exception from get(). Speaking of get(), there is also the CompletableFuture.join() method with subtle changes in error handling. But overall, they're all the same. Finally there is the CompletableFuture.getNow(valueIfAbsent) method which does not block but will return the default value if the Future has not completed yet, which makes it very useful when building robust systems where we don't want to wait too long.
The final static method is to use completedFuture(value) to return the completed Future object, which may be very useful when testing or writing some adapter layers.
2. Create and obtain CompletableFuture
Okay, so creating a CompletableFuture manually is our only option? uncertain. Just like regular Futures, we can associate existing tasks, and CompletableFuture uses factory methods:
Copy the code code as follows:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
The parameterless method Executor ends with...Async and will use ForkJoinPool.commonPool() (global, common pool introduced in JDK8), which applies to most methods in the CompletableFuture class. runAsync() is easy to understand, note that it requires a Runnable, so it returns CompletableFuture<Void> as the Runnable returns no value. If you need to handle asynchronous operations and return results, use Supplier<U>:
Copy the code code as follows:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);
But don’t forget, there are lambdas expressions in Java 8!
Copy the code code as follows:
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);
or:
Copy the code code as follows:
final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
Although this article is not about lambdas, I use lambda expressions quite frequently.
3. Conversion and action on CompletableFuture(thenApply)
I said CompletableFuture is better than Future but don't you know why? Simply put, because CompletableFuture is an atom and a factor. Isn't what I said helpful? Both Scala and JavaScript allow you to register an asynchronous callback when a future completes, and we don't have to wait and block it until it's ready. We can simply say: when you run this function, the result appears. In addition, we can stack these functions, combine multiple futures together, etc. For example, if we convert from String to Integer, we can convert from CompletableFuture to CompletableFuture<Integer without association. This is done via thenApply():
Copy the code code as follows:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);<p></p>
<p>As mentioned... the Async version provides most operations on CompletableFuture, so I will skip them in later sections. Remember, the first method will call the method in the same thread where the future is completed, while the remaining two will call it asynchronously in different thread pools.
Let's take a look at the workflow of thenApply():</p>
<p><pre>
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>
Or in a statement:
Copy the code code as follows:
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
Here, you will see the conversion of a sequence, from String to Integer to Double. But most importantly, these transformations neither execute immediately nor stop. These transformations neither execute immediately nor stop. They simply remember the program they executed when the original f1 completed. If certain transformations are very time-consuming, you can provide your own Executor to run them asynchronously. Note that this operation is equivalent to a unary map in Scala.
4. Run the completed code (thenAccept/thenRun)
Copy the code code as follows:
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);
There are two typical "final" stage methods in future pipelines. They are prepared when you use the future's value. When thenAccept() provides the final value, thenRun executes the Runnable, which doesn't even have a way to calculate the value. For example:
Copy the code code as follows:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
...Async variables are also available in two ways, implicit and explicit executors, and I won't emphasize this method too much.
The thenAccept()/thenRun() methods do not block (even if there is no explicit executor). They are like an event listener/handler, which will execute for a period of time when you connect it to a future. The "Continuing" message will appear immediately, although the future is not even completed.
5. Error handling of a single CompletableFuture
So far, we have only discussed the results of calculations. What about exceptions? Can we handle them asynchronously? certainly!
Copy the code code as follows:
CompletableFuture<String> safe =
future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
When exceptionally() accepts a function, the original future will be called to throw an exception. We will have the opportunity to convert this exception into some value compatible with the Future type to recover. safeFurther conversions will no longer raise an exception but will instead return a String value from the function providing the functionality.
A more flexible approach is for handle() to accept a function that receives the correct result or exception:
Copy the code code as follows:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
handle() is always called, and the results and exceptions are non-null. This is a one-stop all-round strategy.
6. Combine two CompletableFutures together
CompletableFuture as one of the asynchronous processes is great but it really shows how powerful it is when multiple such futures are combined in various ways.
7. Combine (link) these two futures (thenCompose())
Sometimes you want to run on some future's value (when it's ready), but this function also returns a future. CompletableFuture is flexible enough to understand that our function result should now be used as a top-level future, compared to CompletableFuture<CompletableFuture>. The method thenCompose() is equivalent to Scala’s flatMap:
Copy the code code as follows:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
...Async variations are also available. In the following example, carefully observe the types and differences between thenApply()(map) and thenCompose()(flatMap). When applying the calculateRelevance() method, a CompletableFuture is returned:
Copy the code code as follows:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...
thenCompose() is an important method that allows building robust and asynchronous pipelines without blocking and waiting intermediate steps.
8. Conversion values of two futures (thenCombine())
When thenCompose() is used to chain a future that depends on another thenCombine, when they are both completed it combines the two independent futures:
Copy the code code as follows:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
...Async variables are also available, assuming you have two CompletableFutures, one loading the Customer and the other loading the recent Shop. They are completely independent of each other, but when they are completed you want to use their values to calculate the Route. Here is a deprivable example:
Copy the code code as follows:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
Please note that in Java 8 you can simply replace the reference to this::findRoute method with (cust, shop) -> findRoute(cust, shop):
Copy the code code as follows:
customerFuture.thenCombine(shopFuture, this::findRoute);
As you know, we have customerFuture and shopFuture. Then the routeFuture wraps them and "waits" for them to complete. When they are ready, it will run the function we provided to combine all the results (findRoute()). This routeFuture will complete when the two basic futures complete and findRoute() also completes.
9. Wait for all CompletableFutures to complete
If instead of generating a new CompletableFuture connecting these two results, we just want to be notified when it is completed, we can use the thenAcceptBoth()/runAfterBoth() series of methods, (...Async variables are also available). They work similarly to thenAccept() and thenRun(), but wait for two futures instead of one:
Copy the code code as follows:
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
Imagine the above example, instead of generating a new CompletableFuture, you just want to send some events or refresh the GUI immediately. This can be easily achieved: thenAcceptBoth():
Copy the code code as follows:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//refresh GUI with route
});
I hope I'm wrong, but maybe some people will ask themselves a question: why can't I simply block these two futures? Like:
Copy the code code as follows:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());
Well, of course you can do that. But the most critical point is that CompletableFuture allows asynchronousness. It is an event-driven programming model rather than blocking and eagerly waiting for results. So functionally, the above two parts of code are equivalent, but the latter does not need to occupy a thread to execute.
10. Wait for the first CompletableFuture to complete the task
Another interesting thing is that the CompletableFutureAPI can wait for the first (as opposed to all) future to complete. This is very convenient when you have the results of two tasks of the same type. You only care about response time, and no task takes priority. API methods (…Async variables are also available):
Copy the code code as follows:
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
As an example, you have two systems that can be integrated. One has a smaller average response time but a high standard deviation, the other is generally slower but more predictable. To get the best of both worlds (performance and predictability) you can call both systems at the same time and wait for whichever finishes first. Usually this will be the first system, but when progress becomes slow, the second system can complete in an acceptable time:
Copy the code code as follows:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});
s represents the String obtained from fetchFast() or fetchPredictably(). We don’t have to know or care.
11. Completely convert the first system
applyToEither() is considered the predecessor of acceptEither(). When two futures are about to complete, the latter simply calls some code snippet and applyToEither() will return a new future. When these two initial futures complete, the new future will also complete. The API is somewhat similar (...Async variables are also available):
Copy the code as follows:<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)
This additional fn function can be completed when the first future is called. I'm not sure what the purpose of this specialized method is, after all one could simply use: fast.applyToEither(predictable).thenApply(fn). Since we're stuck with this API, but we don't really need the extra functionality for the application, I'll simply use the Function.identity() placeholder:
Copy the code code as follows:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
fast.applyToEither(predictable, Function.<String>identity());
The first completed future can be run. Note that from the client's perspective, both futures are actually hidden behind firstDone. The client just waits for the future to complete and uses applyToEither() to notify the client when the first two tasks are completed.
12. CompletableFuture with multiple combinations
We now know how to wait for two futures to complete (using thenCombine()) and the first one to complete (applyToEither()). But can it scale to any number of futures? Indeed, use static helper methods:
Copy the code code as follows:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
allOf() uses an array of futures and returns a future (waiting for all obstacles) when all potential futures have completed. On the other hand anyOf() will wait for the fastest potential futures. Please look at the general type of returned futures. Isn't this what you expect? We will focus on this issue in the next article.
Summarize
We explored the entire CompletableFuture API. I am convinced that this will be invincible, so in the next article we will look at the implementation of another simple web crawler using CompletableFuture methods and Java 8 lambda expressions. We will also look at CompletableFuture