Java 8 уже здесь, пришло время узнать что-то новое. Java 7 и Java 6 — это лишь слегка модифицированные версии, но в Java 8 будут существенные улучшения. Может быть, Java 8 слишком велика? Сегодня я дам вам подробное объяснение новой абстракции CompletableFuture в JDK 8. Как мы все знаем, Java 8 выйдет менее чем через год, поэтому эта статья основана на JDK 8 build 88 с поддержкой лямбда-выражений. CompletableFuture расширяет Future, предоставляет методы, унарные операторы и способствует асинхронности и модели программирования, управляемой событиями, которая не ограничивается более старыми версиями Java. Если вы откроете JavaDoc CompletableFuture, вы будете шокированы. Существует около пятидесяти методов (!), и некоторые из них очень интересны и сложны для понимания, например:
Скопируйте код следующим образом: public <U,V> CompletableFuture<V> thenCombineAsync(
CompletableFuture<? расширяет U> другое,
BiFunction<? супер T,? супер U,? расширяет V> fn,
Исполнитель-исполнитель)
Не волнуйтесь, продолжайте читать. CompletableFuture собирает все характеристики ListenableFuture в Guava и SettableFuture. Кроме того, встроенные лямбда-выражения приближают его к будущему Scala/Akka. Возможно, это звучит слишком хорошо, чтобы быть правдой, но читайте дальше. CompletableFuture имеет два основных аспекта, которые превосходят асинхронный обратный вызов/преобразование Future в ol, что позволяет устанавливать значение CompletableFuture из любого потока в любое время.
1. Извлеките и измените значение пакета.
Часто фьючерсы представляют собой код, выполняющийся в других потоках, но это не всегда так. Иногда вам нужно создать будущее, чтобы указать, что вы знаете, что произойдет, например, прибытие сообщения JMS. Итак, у вас есть Будущее, но нет потенциальной асинхронной работы в будущем. Вы просто хотите, чтобы все было готово (разрешено), когда прибудет будущее сообщение JMS, вызванное событием. В этом случае вы можете просто создать CompletableFuture для возврата к вашему клиенту, а просто Complete() разблокирует всех клиентов, ожидающих Future, пока вы считаете, что ваш результат доступен.
Сначала вы можете просто создать новый CompletableFuture и передать его своему клиенту:
Скопируйте код следующим образом: public CompletableFuture<String> Ask() {
окончательный CompletableFuture<String> будущее = новый CompletableFuture<>();
//...
вернуть будущее;
}
Обратите внимание, что это будущее не имеет связи с Callable, нет пула потоков и не работает асинхронно. Если клиентский код теперь вызывает Ask().get(), он будет заблокирован навсегда. Если регистры завершат обратный вызов, они никогда не вступят в силу. Так в чем же ключ? Теперь вы можете сказать:
Скопируйте код следующим образом: Future.complete("42")
...В этот момент все клиенты Future.get() получат результат строки, и он вступит в силу сразу после завершения обратного вызова. Это очень удобно, когда вы хотите представить задачу Future, и нет необходимости вычислять задачу какого-то потока выполнения. CompletableFuture.complete() можно вызвать только один раз, последующие вызовы будут игнорироваться. Но существует также бэкдор под названием CompletableFuture.obtrumeValue(...), который перезаписывает предыдущее значение нового Future, поэтому используйте его с осторожностью.
Иногда вам хочется посмотреть, что произойдет в случае сбоя сигнала, поскольку вы знаете, что объект Future может обрабатывать содержащийся в нем результат или исключение. Если вы хотите передать некоторые исключения дальше, вы можете использовать CompletableFuture.completeExceptionally(ex) (или использовать более мощный метод, например obtrumeException(ex), чтобы переопределить предыдущее исключение). CompleteExceptionally() также разблокирует всех ожидающих клиентов, но на этот раз генерирует исключение из get(). Говоря о get(), существует также метод CompletableFuture.join() с небольшими изменениями в обработке ошибок. Но в целом они все одинаковые. Наконец, есть метод CompletableFuture.getNow(valueIfAbsent), который не блокирует, но возвращает значение по умолчанию, если Future еще не завершено, что делает его очень полезным при построении надежных систем, где мы не хотим ждать слишком долго.
Последний статический метод — использовать CompleteFuture(value) для возврата завершенного объекта Future, что может быть очень полезно при тестировании или написании некоторых слоев адаптера.
2. Создайте и получите CompletableFuture.
Хорошо, значит, создание CompletableFuture вручную — наш единственный вариант? неопределенный. Как и в случае с обычными Futures, мы можем связывать существующие задачи, а CompletableFuture использует фабричные методы:
Скопируйте код кода следующим образом:
статический <U> CompletableFuture<U> SupplyAsync (Поставщик<U> поставщик);
статический <U> CompletableFuture<U> SupplyAsync (Поставщик<U>, Исполнитель);
статический CompletableFuture<Void> runAsync(Runnable runnable);
статический CompletableFuture<Void> runAsync (Runnable runnable, Исполнитель-исполнитель);
Метод Executor без параметров заканчивается на...Async и будет использовать ForkJoinPool.commonPool() (глобальный общий пул, представленный в JDK8), который применяется к большинству методов класса CompletableFuture. runAsync() легко понять, обратите внимание, что для него требуется Runnable, поэтому он возвращает CompletableFuture<Void>, поскольку Runnable не возвращает значения. Если вам нужно обрабатывать асинхронные операции и возвращать результаты, используйте Поставщик<U>:
Скопируйте код кода следующим образом:
Final CompletableFuture<String> Future = CompletableFuture.supplyAsync(new Поставщик<String>() {
@Override
публичная строка get() {
//...долго работает...
вернуть «42»;
}
}, исполнитель);
Но не забывайте, что в Java 8 есть лямбда-выражения!
Скопируйте код кода следующим образом:
FinalCompletableFuture<String> будущее = CompletableFuture.supplyAsync(() -> {
//...долго работает...
вернуть «42»;
}, исполнитель);
или:
Скопируйте код кода следующим образом:
окончательный CompletableFuture<String> будущее =
CompletableFuture.supplyAsync(() -> longRunningTask(params), исполнитель);
Хотя эта статья не о лямбда-выражениях, я использую лямбда-выражения довольно часто.
3. Преобразование и действие над CompletableFuture(thenApply)
Я сказал, что CompletableFuture лучше, чем Future, но разве вы не знаете, почему? Проще говоря, потому что CompletableFuture — это атом и фактор. Разве то, что я сказал, не помогло? И Scala, и JavaScript позволяют вам регистрировать асинхронный обратный вызов после завершения будущего, и нам не нужно ждать и блокировать его, пока он не будет готов. Можно просто сказать: при запуске этой функции появляется результат. Кроме того, мы можем объединять эти функции, объединять несколько фьючерсов и т. д. Например, если мы преобразуем String в Integer, мы можем преобразовать CompletableFuture в CompletableFuture<Integer без ассоциации. Это делается с помощью thenApply():
Скопируйте код кода следующим образом:
<U> CompletableFuture<U> thenApply(Function<? super T,? расширяет U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? расширяет U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? расширяет U> fn, исполнитель-исполнитель);<p></p>
<p>Как уже упоминалось... версия Async обеспечивает большинство операций над CompletableFuture, поэтому я пропущу их в последующих разделах. Помните, что первый метод будет вызывать метод в том же потоке, где завершается фьючерс, а остальные два будут вызывать его асинхронно в разных пулах потоков.
Давайте посмотрим на рабочий процесс thenApply():</p>
<p><pre>
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>
Или в заявлении:
Скопируйте код кода следующим образом:
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
Здесь вы увидите преобразование последовательности из строки в целое число в двойное. Но самое главное, эти преобразования не выполняются ни немедленно, ни останавливаются. Эти преобразования не выполняются немедленно и не останавливаются. Они просто запоминают программу, которую выполнили после завершения исходной f1. Если определенные преобразования занимают очень много времени, вы можете предоставить собственный Executor для их асинхронного выполнения. Обратите внимание, что эта операция эквивалентна унарному отображению в Scala.
4. Запустите готовый код (thenAccept/thenRun).
Скопируйте код кода следующим образом:
CompletableFuture<Void> thenAccept(Consumer<? super T> блок);
CompletableFuture<Void> thenRun (Выполняемое действие);
В будущих конвейерах есть два типичных метода «заключительного» этапа. Они подготавливаются, когда вы используете будущее значение. Когда thenAccept() предоставляет окончательное значение, thenRun выполняет Runnable, у которого даже нет способа вычислить значение. Например:
Скопируйте код кода следующим образом:
Future.thenAcceptAsync(dbl -> log.debug("Результат: {}", dbl), исполнитель);
log.debug("Продолжение");
... Асинхронные переменные также доступны двумя способами: неявными и явными исполнителями, и я не буду слишком подчеркивать этот метод.
Методы thenAccept()/thenRun() не блокируются (даже если нет явного исполнителя). Они похожи на прослушиватель/обработчик событий, который будет выполняться в течение определенного периода времени, когда вы подключите его к будущему. Сразу появится сообщение «Продолжение», хотя будущее еще даже не завершено.
5. Обработка ошибок одного CompletableFuture
До сих пор мы обсуждали только результаты расчетов. А как насчет исключений? Можем ли мы обрабатывать их асинхронно? конечно!
Скопируйте код кода следующим образом:
CompletableFuture<String> безопасно =
Future.Exceptionally(ex -> "У нас проблема: " + ex.getMessage());
Когда uniquely() принимает функцию, будет вызываться исходное будущее, чтобы выдать исключение. У нас будет возможность преобразовать это исключение в какое-то значение, совместимое с типом Future, для восстановления. Преобразования SafeFurther больше не будут вызывать исключение, а вместо этого будут возвращать строковое значение из функции, обеспечивающей эту функциональность.
Более гибкий подход заключается в том, чтобы handle() принять функцию, которая получает правильный результат или исключение:
Скопируйте код кода следующим образом:
CompletableFuture<Integer> Safe = Future.handle((ok, ex) -> {
если (ок!= ноль) {
вернуть Integer.parseInt(ок);
} еще {
log.warn("Проблема", ex);
вернуть -1;
}
});
handle() вызывается всегда, а результаты и исключения не равны нулю. Это универсальная стратегия.
6. Объедините два CompletableFuture вместе
CompletableFuture как один из асинхронных процессов великолепен, но он действительно показывает, насколько он эффективен, когда несколько таких фьючерсов комбинируются различными способами.
7. Объединить (связать) эти два фьючерса (затем Compose())
Иногда вам нужно использовать какое-то будущее значение (когда оно будет готово), но эта функция также возвращает будущее. CompletableFuture достаточно гибок, чтобы понимать, что результат нашей функции теперь должен использоваться как будущее верхнего уровня, по сравнению с CompletableFuture<CompletableFuture>. Метод thenCompose() эквивалентен методу FlatMap в Scala:
Скопируйте код кода следующим образом:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
... Также доступны асинхронные варианты. В следующем примере внимательно обратите внимание на типы и различия между thenApply()(map) и thenCompose()(flatMap). При применении метода CalculRelevance() возвращается CompletableFuture:
Скопируйте код кода следующим образом:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> релевантностьFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
Private CompletableFuture<Double> CalculateRelevance(Document doc) //...
thenCompose() — важный метод, который позволяет создавать надежные и асинхронные конвейеры без блокировки и ожидания промежуточных шагов.
8. Конверсионные значения двух фьючерсов (thenCombine())
Когда thenCompose() используется для связывания будущего, которое зависит от другого thenCombine, когда они оба завершены, оно объединяет два независимых фьючерса:
Скопируйте код кода следующим образом:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? расширяет U> другое, BiFunction<? super T,? super U,? расширяет V> fn)
...Асинхронные переменные также доступны, если у вас есть два CompletableFutures, один загружает клиента, а другой загружает недавний магазин. Они полностью независимы друг от друга, но когда они будут завершены, вы захотите использовать их значения для расчета Маршрута. Вот пример депривации:
Скопируйте код кода следующим образом:
CompletableFuture<Клиент> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = ближайшийShop();
CompletableFuture<Route> RouteFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
частный маршрут findRoute(Клиент-клиент, Магазин-магазин) //...
Обратите внимание, что в Java 8 вы можете просто заменить ссылку на метод this::findRoute на (cust, shop) -> findRoute(cust, shop):
Скопируйте код кода следующим образом:
customerFuture.thenCombine(shopFuture, this::findRoute);
Как вы знаете, у нас есть customerFuture и shopFuture. Затем маршрутFuture оборачивает их и «ждет» их завершения. Когда они будут готовы, он запустит предоставленную нами функцию для объединения всех результатов (findRoute()). Этот маршрут RouteFuture завершится, когда завершатся два основных фьючерса, а также завершится findRoute().
9. Дождитесь завершения всех CompletableFutures.
Если вместо создания нового CompletableFuture, соединяющего эти два результата, мы просто хотим получить уведомление о его завершении, мы можем использовать серию методов thenAcceptBoth()/runAfterBoth() (...также доступны асинхронные переменные). Они работают аналогично thenAccept() и thenRun(), но ждут двух фьючерсов вместо одного:
Скопируйте код кода следующим образом:
<U> CompletableFuture<Void> thenAcceptBoth (CompletableFuture<? расширяет U> другой, BiConsumer<? super T,? super U> блок)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> другое, действие Runnable)
Представьте себе приведенный выше пример: вместо создания нового CompletableFuture вы просто хотите немедленно отправить несколько событий или обновить графический интерфейс. Этого можно легко добиться: thenAcceptBoth():
Скопируйте код кода следующим образом:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
окончательный маршрут маршрута = findRoute (cust, магазин);
//обновляем графический интерфейс с маршрутом
});
Надеюсь, я ошибаюсь, но, возможно, некоторые люди зададут себе вопрос: почему я не могу просто заблокировать эти два будущего? Нравиться:
Скопируйте код кода следующим образом:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = ближайшийShop();
findRoute(customerFuture.get(), shopFuture.get());
Ну, конечно, вы можете это сделать. Но наиболее важным моментом является то, что CompletableFuture допускает асинхронность. Это модель программирования, управляемая событиями, а не блокировка и нетерпеливое ожидание результатов. Таким образом, функционально две приведенные выше части кода эквивалентны, но для выполнения последней не требуется занимать поток.
10. Подождите, пока первый CompletableFuture завершит задачу.
Еще одна интересная вещь заключается в том, что CompletableFutureAPI может дождаться завершения первого (а не всех) будущего. Это очень удобно, когда у вас есть результаты двух однотипных задач. Вас интересует только время ответа, и ни одна задача не имеет приоритета. Методы API (…также доступны асинхронные переменные):
Скопируйте код кода следующим образом:
CompletableFuture<Void> AcceptEither (CompletableFuture<? расширяет T> другой, блок Consumer<? super T>)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> другое, действие Runnable)
Например, у вас есть две системы, которые можно интегрировать. У одного меньшее среднее время отклика, но высокое стандартное отклонение, другой, как правило, медленнее, но более предсказуем. Чтобы получить лучшее от обоих миров (производительность и предсказуемость), вы можете вызвать обе системы одновременно и дождаться того, какая из них завершится раньше. Обычно это первая система, но когда прогресс становится медленным, вторая система может завершиться за приемлемое время:
Скопируйте код кода следующим образом:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> предсказуемый = fetchPredictible();
fast.acceptEither(предсказуемый, s -> {
System.out.println("Результат: " + s);
});
s представляет строку, полученную с помощью fetchFast() или fetchPredictable(). Нам не обязательно знать или волноваться.
11. Полностью конвертировать первую систему
applyToEither() считается предшественником AcceptEither(). Когда два фьючерса приближаются к завершению, последний просто вызывает некоторый фрагмент кода, а метод applyToEither() вернет новое фьючерс. Когда эти два начальных будущего завершатся, новое будущее также завершится. API чем-то похож (...также доступны асинхронные переменные):
Скопируйте код следующим образом: <U> CompletableFuture<U> applyToEither(CompletableFuture<? расширяет T> другое, Function<? super T,U> fn)
Эту дополнительную функцию fn можно выполнить при вызове первого будущего. Я не уверен, какова цель этого специализированного метода, ведь можно просто использовать: fast.applyToEither(predictable).thenApply(fn). Поскольку мы застряли на этом API, но дополнительная функциональность приложения нам не нужна, я просто использую заполнитель Function.identity():
Скопируйте код кода следующим образом:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> предсказуемый = fetchPredictible();
CompletableFuture<String> firstDone =
fast.applyToEither(предсказуемый, Function.<String>identity());
Первое завершенное будущее можно запустить. Обратите внимание, что с точки зрения клиента оба фьючерса фактически скрыты за firstDone. Клиент просто ждет завершения будущего и использует applyToEither(), чтобы уведомить клиента о завершении первых двух задач.
12. CompletableFuture с несколькими комбинациями
Теперь мы знаем, как дождаться завершения двух фьючерсов (с помощью thenCombine()) и завершения первого (applyToEither()). Но может ли он масштабироваться на любое количество фьючерсов? Действительно, используйте статические вспомогательные методы:
Скопируйте код кода следующим образом:
статический CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
статический CompletableFuture<Object< AnyOf(CompletableFuture<?<... cfs)
allOf() использует массив фьючерсов и возвращает фьючерс (ожидая всех препятствий), когда все потенциальные фьючерсы завершены. С другой стороны, AnyOf() будет ждать самого быстрого потенциального фьючерса. Обратите внимание на общий тип возвращаемого фьючерса. Разве это не то, что вы ожидаете? Этому вопросу мы уделим внимание в следующей статье.
Подвести итог
Мы изучили весь API CompletableFuture. Я убежден, что это будет непобедимо, поэтому в следующей статье мы рассмотрим реализацию еще одного простого веб-сканера с использованием методов CompletableFuture и лямбда-выражений Java 8. Мы также рассмотрим CompletableFuture.