Java 8 ist da, es ist Zeit, etwas Neues zu lernen. Bei Java 7 und Java 6 handelt es sich nur um leicht modifizierte Versionen, bei Java 8 wird es jedoch erhebliche Verbesserungen geben. Vielleicht ist Java 8 zu groß? Heute werde ich Ihnen die neue Abstraktion CompletableFuture in JDK 8 ausführlich erläutern. Wie wir alle wissen, wird Java 8 in weniger als einem Jahr veröffentlicht, daher basiert dieser Artikel auf JDK 8 Build 88 mit Lambda-Unterstützung. CompletableFuture erweitert Future um Methoden, unäre Operatoren und fördert Asynchronität sowie ein ereignisgesteuertes Programmiermodell, das auch bei älteren Java-Versionen nicht Halt macht. Wenn Sie das JavaDoc von CompletableFuture öffnen, werden Sie schockiert sein. Es gibt ungefähr fünfzig Methoden (!), und einige davon sind sehr interessant und schwer zu verstehen, zum Beispiel:
Kopieren Sie den Code wie folgt: public <U,V> CompletableFuture<V> thenCombineAsync(
CompletableFuture<? erweitert U> andere,
BiFunction<? super T,? super U,? erweitert V> fn,
Testamentsvollstrecker (Vollstrecker)
Machen Sie sich keine Sorgen, lesen Sie weiter. CompletableFuture sammelt alle Merkmale von ListenableFuture in Guava und SettableFuture. Darüber hinaus bringen integrierte Lambda-Ausdrücke es Scala/Akka-Futures näher. Das mag zu schön klingen, um wahr zu sein, aber lesen Sie weiter. CompletableFuture hat zwei Hauptaspekte, die dem asynchronen Rückruf/der asynchronen Konvertierung von Future in ol überlegen sind, wodurch der Wert von CompletableFuture jederzeit von jedem Thread aus festgelegt werden kann.
1. Extrahieren und ändern Sie den Wert des Pakets
Oft stellen Futures Code dar, der in anderen Threads ausgeführt wird, aber das ist nicht immer der Fall. Manchmal möchten Sie eine Zukunft erstellen, um anzuzeigen, dass Sie wissen, was passieren wird, beispielsweise das Eintreffen einer JMS-Nachricht. Sie haben also eine Zukunft, aber keine potenzielle asynchrone Arbeit in der Zukunft. Sie möchten lediglich erledigt (aufgelöst) werden, wenn eine zukünftige JMS-Nachricht eintrifft, die durch ein Ereignis gesteuert wird. In diesem Fall können Sie einfach eine CompletableFuture erstellen, um zu Ihrem Client zurückzukehren, und einfach „complete()“ entsperrt alle Clients, die auf die Zukunft warten, solange Sie glauben, dass Ihr Ergebnis verfügbar ist.
Zuerst können Sie einfach ein neues CompletableFuture erstellen und es Ihrem Kunden geben:
Kopieren Sie den Code wie folgt: public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
//......
Rückkehr in die Zukunft;
}
Beachten Sie, dass diese Zukunft keine Verbindung zu Callable hat, keinen Thread-Pool hat und nicht asynchron funktioniert. Wenn der Clientcode jetzt ask().get() aufruft, wird er für immer blockiert. Wenn die Register den Rückruf abschließen, werden sie nie wirksam. Was ist also der Schlüssel? Jetzt können Sie sagen:
Kopieren Sie den Code wie folgt: future.complete("42")
...In diesem Moment erhalten alle Future.get()-Clients das Ergebnis der Zeichenfolge und werden sofort nach Abschluss des Rückrufs wirksam. Dies ist sehr praktisch, wenn Sie die Aufgabe einer Zukunft darstellen möchten und nicht die Aufgabe eines Ausführungsthreads berechnen müssen. CompletableFuture.complete() kann nur einmal aufgerufen werden, nachfolgende Aufrufe werden ignoriert. Es gibt aber auch eine Hintertür namens CompletableFuture.obtrudeValue(...), die den vorherigen Wert einer neuen Zukunft überschreibt, also verwenden Sie sie bitte mit Vorsicht.
Manchmal möchten Sie sehen, was passiert, wenn ein Signal fehlschlägt, da Sie wissen, dass ein Future-Objekt das darin enthaltene Ergebnis oder die darin enthaltene Ausnahme verarbeiten kann. Wenn Sie einige Ausnahmen weiter übergeben möchten, können Sie CompletableFuture.completeExceptionally(ex) verwenden (oder eine leistungsfähigere Methode wie obtrudeException(ex) verwenden, um die vorherige Ausnahme zu überschreiben). CompleteExceptionally() entsperrt auch alle wartenden Clients, löst dieses Mal jedoch eine Ausnahme von get() aus. Apropos get(): Es gibt auch die Methode CompletableFuture.join() mit geringfügigen Änderungen in der Fehlerbehandlung. Aber im Großen und Ganzen sind sie alle gleich. Schließlich gibt es noch die Methode CompletableFuture.getNow(valueIfAbsent), die nicht blockiert, sondern den Standardwert zurückgibt, wenn die Zukunft noch nicht abgeschlossen ist, was sie beim Aufbau robuster Systeme, bei denen wir nicht zu lange warten möchten, sehr nützlich macht.
Die letzte statische Methode besteht darin, CompletedFuture (Wert) zu verwenden, um das vollständige Future-Objekt zurückzugeben. Dies kann beim Testen oder Schreiben einiger Adapterschichten sehr nützlich sein.
2. Erstellen und erhalten Sie CompletableFuture
Okay, also ist die manuelle Erstellung eines CompletableFuture unsere einzige Option? unsicher. Genau wie normale Futures können wir bestehende Aufgaben verknüpfen, und CompletableFuture verwendet Factory-Methoden:
Kopieren Sie den Codecode wie folgt:
static <U> CompletableFuture<U> SupplyAsync(Supplier<U> Lieferant);
static <U> CompletableFuture<U> SupplyAsync(Supplier<U> Lieferant, Executor Executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
Die parameterlose Methode Executor endet mit ...Async und verwendet ForkJoinPool.commonPool() (globaler, gemeinsamer Pool, eingeführt in JDK8), was für die meisten Methoden in der CompletableFuture-Klasse gilt. runAsync() ist leicht zu verstehen. Beachten Sie, dass es ein Runnable erfordert und daher CompletableFuture<Void> zurückgibt, da das Runnable keinen Wert zurückgibt. Wenn Sie asynchrone Vorgänge verarbeiten und Ergebnisse zurückgeben müssen, verwenden Sie Supplier<U>:
Kopieren Sie den Codecode wie folgt:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
öffentlicher String get() {
//...langfristig...
Rückgabe „42“;
}
}, Testamentsvollstrecker);
Aber vergessen Sie nicht, dass es in Java 8 Lambdas-Ausdrücke gibt!
Kopieren Sie den Codecode wie folgt:
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...langfristig...
Rückgabe „42“;
}, Testamentsvollstrecker);
oder:
Kopieren Sie den Codecode wie folgt:
final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
Obwohl es in diesem Artikel nicht um Lambdas geht, verwende ich Lambda-Ausdrücke recht häufig.
3. Konvertierung und Aktion auf CompletableFuture(thenApply)
Ich sagte, CompletableFuture sei besser als Future, aber wissen Sie nicht warum? Einfach ausgedrückt, weil CompletableFuture ein Atom und ein Faktor ist. Ist das, was ich gesagt habe, nicht hilfreich? Sowohl Scala als auch JavaScript ermöglichen es Ihnen, einen asynchronen Rückruf zu registrieren, wenn ein Future abgeschlossen ist, und wir müssen nicht warten und ihn blockieren, bis er bereit ist. Wir können einfach sagen: Wenn Sie diese Funktion ausführen, erscheint das Ergebnis. Darüber hinaus können wir diese Funktionen stapeln, mehrere Futures miteinander kombinieren usw. Wenn wir beispielsweise von String in Integer konvertieren, können wir ohne Assoziation von CompletableFuture in CompletableFuture<Integer konvertieren. Dies geschieht über thenApply():
Kopieren Sie den Codecode wie folgt:
<U> CompletableFuture<U> thenApply(Function<? super T,? erweitert U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? erweitert U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? erweitert U> fn, Executor executor);<p></p>
<p>Wie bereits erwähnt... stellt die Async-Version die meisten Operationen auf CompletableFuture bereit, daher werde ich sie in späteren Abschnitten überspringen. Denken Sie daran, dass die erste Methode die Methode im selben Thread aufruft, in dem die Zukunft abgeschlossen ist, während die restlichen beiden sie asynchron in verschiedenen Thread-Pools aufrufen.
Werfen wir einen Blick auf den Workflow von thenApply():</p>
<p><pre>
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>
Oder in einer Aussage:
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
Hier sehen Sie die Konvertierung einer Sequenz von String zu Integer zu Double. Am wichtigsten ist jedoch, dass diese Transformationen weder sofort ausgeführt noch gestoppt werden. Diese Transformationen werden weder sofort ausgeführt noch gestoppt. Sie erinnern sich einfach an das Programm, das sie ausgeführt haben, als die ursprüngliche f1 abgeschlossen war. Wenn bestimmte Transformationen sehr zeitaufwändig sind, können Sie Ihren eigenen Executor bereitstellen, um sie asynchron auszuführen. Beachten Sie, dass dieser Vorgang einer unären Karte in Scala entspricht.
4. Führen Sie den fertigen Code aus (thenAccept/thenRun)
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Ausführbare Aktion);
In zukünftigen Pipelines gibt es zwei typische „Endstufen“-Methoden. Sie werden vorbereitet, wenn Sie den Wert der Zukunft verwenden. Wenn thenAccept() den endgültigen Wert bereitstellt, führt thenRun das Runnable aus, das nicht einmal über eine Möglichkeit verfügt, den Wert zu berechnen. Zum Beispiel:
Kopieren Sie den Codecode wie folgt:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Fortfahren");
...Asynchrone Variablen sind auch auf zwei Arten verfügbar: implizite und explizite Executoren, und ich werde diese Methode nicht zu sehr betonen.
Die Methoden thenAccept()/thenRun() blockieren nicht (auch wenn kein expliziter Executor vorhanden ist). Sie sind wie ein Ereignis-Listener/-Handler, der für einen bestimmten Zeitraum ausgeführt wird, wenn Sie ihn mit einem Future verbinden. Die Meldung „Fortfahren“ erscheint sofort, obwohl die Zukunft noch nicht einmal abgeschlossen ist.
5. Fehlerbehandlung eines einzelnen CompletableFuture
Bisher haben wir nur die Ergebnisse der Berechnungen besprochen. Was ist mit Ausnahmen? Können wir sie asynchron verarbeiten? sicherlich!
Kopieren Sie den Codecode wie folgt:
CompletableFuture<String> sicher =
future.Exceptionally(ex -> „Wir haben ein Problem:“ + ex.getMessage());
Wenn „Exceptionally()“ eine Funktion akzeptiert, wird die ursprüngliche Zukunft aufgerufen, um eine Ausnahme auszulösen. Wir werden die Möglichkeit haben, diese Ausnahme zur Wiederherstellung in einen mit dem Future-Typ kompatiblen Wert umzuwandeln. SafeFurther-Konvertierungen lösen keine Ausnahme mehr aus, sondern geben stattdessen einen String-Wert von der Funktion zurück, die die Funktionalität bereitstellt.
Ein flexiblerer Ansatz besteht darin, dass handle() eine Funktion akzeptiert, die das richtige Ergebnis oder die richtige Ausnahme empfängt:
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} anders {
log.warn("Problem", ex);
return -1;
}
});
handle() wird immer aufgerufen und die Ergebnisse und Ausnahmen sind ungleich Null. Dies ist eine Allround-Strategie aus einer Hand.
6. Kombinieren Sie zwei CompletableFutures miteinander
CompletableFuture als einer der asynchronen Prozesse ist großartig, aber es zeigt wirklich, wie mächtig es ist, wenn mehrere solcher Futures auf unterschiedliche Weise kombiniert werden.
7. Kombinieren (verknüpfen) Sie diese beiden Futures (thenCompose())
Manchmal möchten Sie den Wert eines Futures verwenden (wenn er bereit ist), aber diese Funktion gibt auch einen Future zurück. CompletableFuture ist flexibel genug, um zu verstehen, dass unser Funktionsergebnis jetzt als Top-Level-Future im Vergleich zu CompletableFuture<CompletableFuture> verwendet werden sollte. Die Methode thenCompose() entspricht Scalas flatMap:
Kopieren Sie den Codecode wie folgt:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
...Asynchrone Varianten sind ebenfalls verfügbar. Beachten Sie sorgfältig die Typen und Unterschiede zwischen thenApply()(map) und thenCompose()(flatMap).
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//......
private CompletableFuture<Double> berechneRelevance(Dokumentdokument) //...
thenCompose() ist eine wichtige Methode, die den Aufbau robuster und asynchroner Pipelines ermöglicht, ohne Zwischenschritte zu blockieren und zu warten.
8. Umrechnungswerte zweier Futures (thenCombine())
Wenn thenCompose() verwendet wird, um einen Future zu verketten, der von einem anderen thenCombine abhängt, werden nach Abschluss beider Futures die beiden unabhängigen Futures kombiniert:
Kopieren Sie den Codecode wie folgt:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? erweitert U> other, BiFunction<? super T,? super U,? erweitert V> fn)
...Asynchrone Variablen sind ebenfalls verfügbar, vorausgesetzt, Sie haben zwei CompletableFutures, von denen eines den Kunden und das andere den aktuellen Shop lädt. Sie sind völlig unabhängig voneinander, aber wenn sie fertig sind, möchten Sie ihre Werte zur Berechnung der Route verwenden. Hier ist ein entbehrliches Beispiel:
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closeShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//......
private Route findRoute(Customer customer, Shop shop) //...
Bitte beachten Sie, dass Sie in Java 8 den Verweis auf die Methode this::findRoute einfach durch (cust, shop) -> findRoute(cust, shop) ersetzen können:
Kopieren Sie den Codecode wie folgt:
customerFuture.thenCombine(shopFuture, this::findRoute);
Wie Sie wissen, haben wir customerFuture und shopFuture. Dann verpackt sie die RouteFuture und „wartet“, bis sie abgeschlossen ist. Wenn sie bereit sind, wird die von uns bereitgestellte Funktion ausgeführt, um alle Ergebnisse zu kombinieren (findRoute()). Diese RouteFuture wird abgeschlossen, wenn die beiden Basis-Futures abgeschlossen sind und findRoute() ebenfalls abgeschlossen ist.
9. Warten Sie, bis alle CompletableFutures abgeschlossen sind
Wenn wir, anstatt ein neues CompletableFuture zu generieren, das diese beiden Ergebnisse verbindet, nur benachrichtigt werden möchten, wenn es abgeschlossen ist, können wir die Methodenreihe thenAcceptBoth()/runAfterBoth() verwenden (...Async-Variablen sind ebenfalls verfügbar). Sie funktionieren ähnlich wie thenAccept() und thenRun(), warten jedoch auf zwei Futures statt auf eine:
Kopieren Sie den Codecode wie folgt:
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? erweitert U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
Stellen Sie sich das obige Beispiel vor: Anstatt ein neues CompletableFuture zu generieren, möchten Sie einfach nur einige Ereignisse senden oder die GUI sofort aktualisieren. Dies kann leicht erreicht werden: thenAcceptBoth():
Kopieren Sie den Codecode wie folgt:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//GUI mit Route aktualisieren
});
Ich hoffe, dass ich falsch liege, aber vielleicht stellt sich der ein oder andere die Frage: Warum kann ich diese beiden Zukünfte nicht einfach blockieren? Wie:
Kopieren Sie den Codecode wie folgt:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closeShop();
findRoute(customerFuture.get(), shopFuture.get());
Nun, natürlich können Sie das tun. Der kritischste Punkt ist jedoch, dass CompletableFuture Asynchronität ermöglicht. Es handelt sich um ein ereignisgesteuertes Programmiermodell, anstatt zu blockieren und sehnsüchtig auf Ergebnisse zu warten. Funktionell sind die beiden oben genannten Codeteile also gleichwertig, letzterer muss jedoch keinen Thread zur Ausführung belegen.
10. Warten Sie, bis das erste CompletableFuture die Aufgabe abgeschlossen hat
Eine weitere interessante Sache ist, dass die CompletableFutureAPI auf den Abschluss der ersten (und nicht aller) Zukunft warten kann. Dies ist sehr praktisch, wenn Sie die Ergebnisse von zwei Aufgaben desselben Typs haben. Sie kümmern sich nur um die Reaktionszeit und keine Aufgabe hat Priorität. API-Methoden (…Async-Variablen sind ebenfalls verfügbar):
Kopieren Sie den Codecode wie folgt:
CompletableFuture<Void> AcceptEither(CompletableFuture<? Extends T> Other, Consumer<? Super T> Block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
Als Beispiel haben Sie zwei Systeme, die integriert werden können. Einer hat eine kürzere durchschnittliche Reaktionszeit, aber eine hohe Standardabweichung, der andere ist im Allgemeinen langsamer, aber vorhersehbarer. Um das Beste aus beiden Welten (Leistung und Vorhersehbarkeit) zu erhalten, können Sie beide Systeme gleichzeitig aufrufen und warten, bis das System zuerst fertig ist. Normalerweise ist dies das erste System, aber wenn der Fortschritt langsam wird, kann das zweite System in akzeptabler Zeit fertig werden:
Kopieren Sie den Codecode wie folgt:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> Predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Ergebnis: " + s);
});
s stellt den von fetchFast() oder fetchPredictably() erhaltenen String dar. Wir müssen es nicht wissen oder uns darum kümmern.
11. Das erste System komplett umbauen
applyToEither() gilt als Vorgänger von AcceptEither(). Wenn zwei Futures kurz vor dem Abschluss stehen, ruft letzterer einfach einen Codeausschnitt auf und applyToEither() gibt einen neuen Future zurück. Wenn diese beiden anfänglichen Futures abgeschlossen sind, wird auch die neue Zukunft abgeschlossen sein. Die API ist etwas ähnlich (...Async-Variablen sind ebenfalls verfügbar):
Kopieren Sie den Code wie folgt:<U> CompletableFuture<U> applyToEither(CompletableFuture<? erweitert T> other, Function<? super T,U> fn)
Diese zusätzliche fn-Funktion kann beim Aufruf des ersten Futures abgeschlossen werden. Ich bin mir nicht sicher, was der Zweck dieser speziellen Methode ist, schließlich könnte man einfach verwenden: fast.applyToEither(predictable).thenApply(fn). Da wir bei dieser API hängen bleiben, die zusätzliche Funktionalität für die Anwendung aber nicht wirklich benötigen, verwende ich einfach den Platzhalter Function.identity():
Kopieren Sie den Codecode wie folgt:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> Predictable = fetchPredictably();
CompletableFuture<String> firstDone =
fast.applyToEither(predictable, Function.<String>identity());
Der erste abgeschlossene Future kann ausgeführt werden. Beachten Sie, dass aus Sicht des Kunden tatsächlich beide Zukunftsaussichten hinter firstDone verborgen sind. Der Client wartet einfach auf den Abschluss der Zukunft und verwendet applyToEither(), um den Client zu benachrichtigen, wenn die ersten beiden Aufgaben abgeschlossen sind.
12. CompletableFuture mit mehreren Kombinationen
Wir wissen jetzt, wie man auf den Abschluss zweier Futures (mithilfe von thenCombine()) und auf den Abschluss des ersten Futures (applyToEither()) wartet. Aber kann es auf eine beliebige Anzahl von Futures skaliert werden? Verwenden Sie tatsächlich statische Hilfsmethoden:
Kopieren Sie den Codecode wie folgt:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
allOf() verwendet ein Array von Futures und gibt einen Future zurück (der auf alle Hindernisse wartet), wenn alle potenziellen Futures abgeschlossen sind. Andererseits wartet anyOf() auf die schnellsten potenziellen Futures. Bitte schauen Sie sich die allgemeine Art der zurückgegebenen Futures an. Wir werden uns im nächsten Artikel auf dieses Thema konzentrieren.
Zusammenfassen
Wir haben die gesamte CompletableFuture-API untersucht. Ich bin davon überzeugt, dass dies unbesiegbar sein wird, daher werden wir uns im nächsten Artikel mit der Implementierung eines weiteren einfachen Webcrawlers mit CompletableFuture-Methoden und Java 8-Lambda-Ausdrücken befassen