Java 8來了,是時候學一下新的東西了。 Java 7和Java 6只不過是稍作修改的版本,而Java 8將會發生重大的改進。或許是Java 8太大了吧?今天我會給你徹底地解釋JDK 8中的新的抽象CompletableFuture。眾所周知,Java 8不到一年就會發布,因此這篇文章是基於JDK 8 build 88 with lambda support的。 CompletableFuture extends Future提供了方法,一元運算子和促進非同步性以及事件驅動程式設計模型,它並不止於舊版的Java中。如果你打開JavaDoc of CompletableFuture你一定會感到震驚。大約有五十種方法(!),而且它們中的一些非常有意思而且不好理解,例如:
複製程式碼如下:public <U,V> CompletableFuture<V> thenCombineAsync(
CompletableFuture<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor)
不必擔心,繼續讀下去。 CompletableFuture收集了所有ListenableFuture in Guava 和SettableFuture的特色。此外,內建的lambda表達式使它更接近Scala/Akka futures。這聽起來好得令人難以置信,但請繼續閱讀。 CompletableFuture有兩個主要的面向優於ol中的Future 非同步回呼/轉換,這使得從任何時刻的任何執行緒都可以設定CompletableFuture的值。
一、提取、修改包裝的數值
通常futures代表其它執行緒中運行的程式碼,但事實並非總是如此。有時你想要創造一個Future來表示你知道會發生什麼,例如JMS message arrival。所以你有Future但未來並沒有潛在的非同步工作。你只是想在未來JMS訊息到達時簡單地完成(解決),這是由一個事件驅動的。在這種情況下,你可以簡單地建立CompletableFuture來回饋給你的客戶端,只要你認為你的結果是可用的,僅僅透過complete()就能解鎖所有等待Future的客戶端。
首先你可以簡單地建立新的CompletableFuture並且給你的客戶端:
複製程式碼程式碼如下:public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
//...
return future;
}
注意這個future和Callable沒有任何联系,沒有執行緒池也不是非同步工作。如果現在客戶端程式碼呼叫ask().get()它將永遠阻塞。如果暫存器完成回調,它們就永遠不會生效了。所以關鍵是什麼?現在你可以說:
複製程式碼如下:future.complete("42")
…此時此刻所有客戶端Future.get()將會得到字串的結果,同時完成回呼以後將會立即生效。當你想代表Future的任務時是非常方便的,而且沒有必要去計算一些執行緒的任務上。 CompletableFuture.complete()只能呼叫一次,後續呼叫將被忽略。但也有一個後門叫做CompletableFuture.obtrudeValue(…)涵蓋一個新Future之前的價值,請小心使用。
有時你想要看到訊號故障的情況,如你所知Future物件可以處理它所包含的結果或異常。如果你想進一步傳遞一些異常,可以用CompletableFuture.completeExceptionally(ex) (或用obtrudeException(ex)這樣比較強的方法覆寫前面的異常)。 completeExceptionally()也能解鎖所有等待的客戶端,但這次從get()拋出異常。說到get(),也有CompletableFuture.join()方法在錯誤處理上有著細微的變動。但總體上,它們都是一樣的。最後也有CompletableFuture.getNow(valueIfAbsent)方法沒有阻塞但是如果Future還沒完成將返回預設值,這使得當構建那種我們不想等太久的健壯系統時非常有用。
最後static的方法是用completedFuture(value)來傳回已經完成Future的對象,當測試或寫一些適配器層時可能非常有用。
二、創造與獲取CompletableFuture
好了,那麼手動地建立CompletableFuture是我們唯一的選擇嗎?不一定。就像一般的Futures,我們可以關聯存在的任務,同時CompletableFuture使用工廠方法:
複製代碼代碼如下:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
無參方法Executor是以…Async結尾同時將會使用ForkJoinPool.commonPool()(全域的,在JDK8中介紹的通用池),這適用於CompletableFuture類別中的大多數的方法。 runAsync()易於理解,注意它需要Runnable,因此它傳回CompletableFuture<Void>作為Runnable不傳回任何值。如果你需要處理非同步操作並傳回結果,使用Supplier<U>:
複製代碼代碼如下:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);
但別忘了,Java 8裡面還有lambdas表達式呢!
複製代碼代碼如下:
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);
或者:
複製代碼代碼如下:
final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
雖然這篇文章不是關於Lambda的,但我會相當頻繁地使用lambda表達式。
三、轉換和作用於CompletableFuture(thenApply)
我有說過CompletableFuture優於Future但你還不知道為什麼嗎?簡單來說,因為CompletableFuture是一個原子也是一個因子。我說的這句話沒什麼幫助嗎? Scala和JavaScript都允許future完成時允許註冊非同步回調,直到它準備好我們才要等待和阻止它。我們可以簡單地說:運行這個函數時就出現了結果。此外,我們可以疊加這些功能,把多個future組合在一起等。例如如果我們從String轉為Integer,我們可以轉為在不關聯的前提下從CompletableFuture到CompletableFuture<Integer。這是透過thenApply()的方法:
複製代碼代碼如下:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);<p></p>
<p>如前所述...Async版本提供CompletableFuture的大多數操作,因此我將在後面的部分中跳過它們。記住,第一個方法將在future完成的相同執行緒中呼叫該方法,而剩下的兩個將在不同的執行緒池中非同步地呼叫它。
讓我們來看看thenApply()的工作流程:</p>
<p><pre>
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>
或在一個聲明中:
複製代碼代碼如下:
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
這裡,你會看到一個序列的轉換,從String到Integer再到Double。但最重要的是,這些轉換既不立即執行也不停止。這些轉換既不立即執行也不停止。他們只是記得,當原始f1完成他們所執行的程序。如果某些轉換非常耗時,你可以提供你自己的Executor來非同步地運行他們。注意,此操作相當於Scala中的一元map。
四、運行完成的程式碼(thenAccept/thenRun)
複製代碼代碼如下:
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);
在future的管道裡有兩種典型的「最終」階段方法。他們在你使用future的值的時候做好準備,當thenAccept()提供最終的值時,thenRun執行Runnable,這甚至沒有方法去計算值。例如:
複製代碼代碼如下:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
…Async變數也可用兩種方法,隱式和明確執行器,我不會過度強調這個方法。
thenAccept()/thenRun()方法並沒有發生阻塞(即使沒有明確的executor)。它們像一個事件偵聽器/處理程序,當你連接到一個future時,這將執行一段時間。 ”Continuing”訊息將立即出現,儘管future甚至沒有完成。
五、單一CompletableFuture的錯誤處理
到目前為止,我們只討論計算的結果。那麼異常呢?我們可以非同步地處理它們嗎?當然!
複製代碼代碼如下:
CompletableFuture<String> safe =
future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
exceptionally()接受一個函式時,將呼叫原始future來拋出一個例外。我們會有機會將此異常轉換為和Future類型的兼容的一些值來進行恢復。 safe進一步的轉換將不再產生一個異常而是從提供功能的函數傳回一個String值。
一個更靈活的方法是handle()接受一個函數,它接收正確的結果或異常:
複製代碼代碼如下:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
handle()總是被調用,結果和異常都非空,這是個一站式全方位的策略。
六、一起結合兩個CompletableFuture
非同步處理過程之一的CompletableFuture非常不錯但是當多個這樣的futures以各種方式組合在一起時確實顯示了它的強大程度。
七、結合(連結)這兩個futures(thenCompose())
有時你會想執行一些future的值(當它準備好了),但這個函數也回傳了future。 CompletableFuture足夠靈活地明白我們的函數結果現在應該作為頂級的future,對比CompletableFuture<CompletableFuture>。方法thenCompose()相當於Scala的flatMap:
複製代碼代碼如下:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
…Async變化也是可用的,在下面的事例中,仔細觀察thenApply()(map)和thenCompose()(flatMap)的類型和差異,當應用calculateRelevance()方法返回CompletableFuture:
複製代碼代碼如下:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...
thenCompose()是一個重要的方法允許建立健壯的和非同步的管道,沒有阻塞和等待的中間步驟。
八、兩個futures的轉換值(thenCombine())
當thenCompose()用於連結一個future時依賴另一個thenCombine,當他們都完成之後就結合兩個獨立的futures:
複製代碼代碼如下:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
…Async變數也是可用的,假設你有兩個CompletableFuture,一個載入Customer另一個載入最近的Shop。他們彼此完全獨立,但是當他們完成時,您想要使用它們的值來計算Route。這是一個可剝奪的例子:
複製代碼代碼如下:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
注意,在Java 8中可以用(cust, shop) -> findRoute(cust, shop)簡單地取代this::findRoute方法的參考:
複製代碼代碼如下:
customerFuture.thenCombine(shopFuture, this::findRoute);
你也知道,我們有customerFuture 和shopFuture。那麼routeFuture包裝它們然後“等待”它們完成。當他們準備好了,它會運行我們提供的函數來結合所有的結果(findRoute())。當兩個基本的futures完成並且findRoute()也完成時,這樣routeFuture將會完成。
九、等待所有的CompletableFutures 完成
如果不是產生新的CompletableFuture連接這兩個結果,我們只是希望在完成時得到通知,我們可以使用thenAcceptBoth()/runAfterBoth()系列的方法,(…Async 變數也是可用的)。它們的運作方式與thenAccept() 和thenRun()類似,但是等待兩個futures而不是一個:
複製代碼代碼如下:
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
想像一下上面的例子,這不是產生新的CompletableFuture,你只是想要立刻發送一些事件或刷新GUI。這可以很容易地實現:thenAcceptBoth():
複製代碼代碼如下:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//refresh GUI with route
});
我希望我是錯的,但也許有些人會問自己一個問題:為什麼我不能簡單地阻塞這兩個futures呢? 就像:
複製代碼代碼如下:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());
好了,你當然可以這麼做。但最關鍵的一點是CompletableFuture是允許非同步的,它是事件驅動的程式設計模型而不是阻塞並急切地等待結果。所以在功能上,上面兩部分程式碼是等價的,但後者沒有必要佔用一個執行緒來執行。
十、等待第一個CompletableFuture 來完成任務
另一個有趣的事是CompletableFutureAPI可以等待第一個(與所有相反)完成的future。當你有兩個相同類型任務的結果時就顯得非常方便,你只要關心回應時間就行了,沒有哪個任務是優先的。 API方法(…Async變數也是可用的):
複製代碼代碼如下:
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
作為一個例子,你有兩個系統可以整合。一個具有較小的平均反應時間但是擁有高的標準差,另一個一般情況下較慢,但是更加容易預測。為了兩全其美(性能和可預測性)你可以在同一時間調用兩個系統並等著誰先完成。通常這會是第一個系統,但是在進度緩慢時,第二個系統就可以在可接受的時間內完成:
複製代碼代碼如下:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});
s代表了從fetchFast()或fetchPredictably()得到的String。我們不必知道也無需關心。
十一、完整地轉換第一個系統
applyToEither()算是acceptEither()的前輩了。當兩個futures快要完成時,後者只是簡單地呼叫一些程式碼片段,applyToEither()將會傳回一個新的future。當這兩個最初的futures完成時,新的future也會完成。 API有點類似(…Async 變數也是可用的):
複製程式碼如下:<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)
這個額外的fn功能在第一個future被呼叫時能完成。我不確定這個專業化方法的目的是什麼,畢竟一個人可以簡單地使用:fast.applyToEither(predictable).thenApply(fn)。因為我們堅持用這個API,但我們的確不需要額外功能的應用程序,我會簡單地使用Function.identity()佔位符:
複製代碼代碼如下:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
fast.applyToEither(predictable, Function.<String>identity());
第一個完成的future可以透過運作。請注意,從客戶的角度來看,兩個futures實際上是在firstDone的後面而隱藏的。客戶端只是等待著future來完成並且透過applyToEither()使得當最先的兩個任務完成時通知客戶端。
十二、多種結合的CompletableFuture
我們現在知道如何等待兩個future來完成(使用thenCombine())並第一個完成(applyToEither())。但它可以擴展到任意數量的futures嗎?的確,使用static輔助方法:
複製代碼代碼如下:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
allOf()當所有的潛在futures完成時,使用了一個futures數組並且傳回一個future(等待所有的障礙)。另一方面anyOf()將會等待最快的潛在futures,請看一下返回futures的一般類型,這不是你所期望的嗎?我們會在接下來的文章中關註一下這個問題。
總結
我們探索了整個CompletableFuture API。我確信這樣就能戰無不勝了,所以在下一篇文章中我們將研究另一個簡單的web爬蟲程序的實現,使用CompletableFuture方法和Java 8 lambda表達式,我們也會看看CompletableFuture的