RXJAVA是反應性擴展的Java VM實現:用於通過使用可觀察的序列來組成異步和基於事件的程序的庫。
它擴展了觀察者模式以支持數據/事件的序列,並添加了運算符,使您可以聲明地將序列組合在一起,同時抽像對低級線程,同步,線程安全和並發數據結構等事物的擔憂。
在Wiki Home上了解有關RXJAVA的更多信息。
請閱讀3.0中不同的內容,以獲取從2.x升級時的更改和遷移信息的詳細信息。
截至2021年2月28日, 2.x版本是壽命。不會發生進一步的開發,支持,維護,PR和更新。最後一個版本的Javadoc 2.2.21將保持可訪問。
截至2018年3月31日, 1.x版本是壽命。不會發生進一步的開發,支持,維護,PR和更新。最後一個版本1.3.8的Javadoc將保持可訪問。
第一步是將RXJAVA 3包括在您的項目中,例如,作為Gradle編譯依賴性:
implementation " io.reactivex.rxjava3:rxjava:3.x.y "
(請用最新版本編號替換x
和y
:)
第二個是編寫Hello World節目:
package rxjava . examples ;
import io . reactivex . rxjava3 . core .*;
public class HelloWorld {
public static void main ( String [] args ) {
Flowable . just ( "Hello world" ). subscribe ( System . out :: println );
}
}
請注意,RXJAVA 3組件現在生活在io.reactivex.rxjava3
下,基礎類別和接口生活在io.reactivex.rxjava3.core
下。
RXJAVA 3具有幾個可以在:
io.reactivex.rxjava3.core.Flowable
:0..n流動,支持反向流和背壓io.reactivex.rxjava3.core.Observable
:0..n流,沒有背壓,io.reactivex.rxjava3.core.Single
:恰好1項或錯誤的流程,io.reactivex.rxjava3.core.Completable
:一個沒有項目的流量,但僅一個完成或錯誤信號,io.reactivex.rxjava3.core.Maybe
:一個沒有項目的流程,恰好一個項目或錯誤。RXJAVA中的數據流由一個源,零或更多中間步驟組成,然後是數據消費者或組合步驟(在其中責任通過某些方式消耗數據流):
source . operator1 (). operator2 (). operator3 (). subscribe ( consumer );
source . flatMap ( value -> source . operator1 (). operator2 (). operator3 ());
在這裡,如果我們想像自己在operator2
上,向左望向源被稱為上游。向右尋找訂閱者/消費者的右側稱為下游。當每個元素都寫在單獨的行上時,這通常更為明顯:
source
. operator1 ()
. operator2 ()
. operator3 ()
. subscribe ( consumer )
在RXJAVA的文檔中,排放,發射,項目,事件,信號,數據和消息被視為同義詞,代表沿數據流傳播的對象。
當數據流通過異步步驟運行時,每個步驟都可能以不同的速度執行不同的事情。為了避免壓倒性的步驟,通常會出現由於暫時緩衝或需要跳過/刪除數據而增加的內存使用情況,應用了所謂的背壓,這是流程控制的一種形式,這些步驟可以在其中表達多少個項目他們準備好處理了嗎?這允許在通常沒有辦法了解上游將發送多少個項目的情況下,限制數據流的內存使用情況。
在rxjava中,專用Flowable
類被指定為支持背壓, Observable
到的非封閉式操作(簡短序列,GUI相互作用等)。其他類型的Single
, Maybe
和Completable
不支持背壓,也不應;總是有臨時存儲一件物品的空間。
通過應用各種中間運算符來製備數據流程在所謂的組裝時間內發生:
Flowable < Integer > flow = Flowable . range ( 1 , 5 )
. map ( v -> v * v )
. filter ( v -> v % 3 == 0 )
;
在這一點上,數據還沒有流動,也沒有發生副作用。
當調用subscribe()
流動以建立內部處理步驟鏈的流程時,這是一個臨時狀態:
flow . subscribe ( System . out :: println )
這是觸發訂閱副作用的時候(請參閱doOnSubscribe
)。一些來源在此狀態下立即阻止或開始發射項目。
這是流量積極發射項目,錯誤或完成信號的狀態:
Observable . create ( emitter -> {
while (! emitter . isDisposed ()) {
long time = System . currentTimeMillis ();
emitter . onNext ( time );
if ( time % 2 != 0 ) {
emitter . onError ( new IllegalStateException ( "Odd millisecond!" ));
break ;
}
}
})
. subscribe ( System . out :: println , Throwable :: printStackTrace );
實際上,這是上面給定示例的主體執行的時候。
RXJAVA的常見用例之一是在背景線程上運行一些計算,網絡請求,並在UI線程上顯示結果(或錯誤):
import io . reactivex . rxjava3 . schedulers . Schedulers ;
Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
})
. subscribeOn ( Schedulers . io ())
. observeOn ( Schedulers . single ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 ); // <--- wait for the flow to finish
這種鍊式方法的樣式稱為流利的API ,類似於構建器圖案。但是,rxjava的反應性類型是不變的。每個方法調用都會返回一個具有添加行為的新Flowable
。為了說明,可以按以下方式重寫該示例:
Flowable < String > source = Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
});
Flowable < String > runBackground = source . subscribeOn ( Schedulers . io ());
Flowable < String > showForeground = runBackground . observeOn ( Schedulers . single ());
showForeground . subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 );
通常,您可以通過subscribeOn
將計算或阻止IO到其他線程。一旦數據準備就緒,您可以通過observeOn
確保它們在前景或GUI線程上處理。
RXJAVA運算符不直接與Thread
S或ExecutorService
服務合作,而是使用所謂的Scheduler
S抽象均勻API背後的並發源。 RXJAVA 3具有可通過Schedulers
實用程序類訪問的幾個標準調度程序。
Schedulers.computation()
:在背景中的固定數量的專用線程上運行計算密集型工作。大多數異步運算符將其用作其默認Scheduler
。Schedulers.io()
:在動態更改的線程集上運行I/Olike或阻止操作。Schedulers.single()
:以順序和FIFO方式在單個線程上運行工作。Schedulers.trampoline()
:通常用於測試目的,以一個參與線程之一以連續的方式進行工作。這些都可以在所有JVM平台上找到,但是某些特定平台(例如Android)都有自己的典型Scheduler
定義: AndroidSchedulers.mainThread()
, SwingScheduler.instance()
或JavaFXScheduler.platform()
。
此外,還有一個選擇將現有的Executor
(及其子類型(例如ExecutorService
Schedulers.from(Executor)
Scheduler
中。例如,可以使用這可以使用較大但仍然固定的線程池(分別與computation()
和io()
不同)。
Thread.sleep(2000);
最後不是偶然的。在rxjava中,默認Scheduler
在守護程序線程上運行,這意味著一旦Java主線程退出,它們都將停止,而背景計算可能永遠不會發生。在此示例情況下睡一段時間,可以讓您看到節流在控制台上的輸出,並有時間備用。
rxjava中的流程在自然界中是連續的,分為可以彼此同時運行的處理階段:
Flowable . range ( 1 , 10 )
. observeOn ( Schedulers . computation ())
. map ( v -> v * v )
. blockingSubscribe ( System . out :: println );
此示例流將數字從計算Scheduler
中的1到10平方,並在“主”線程上消耗結果(更準確地說,是blockingSubscribe
的呼叫者線程)。但是,lambda v -> v * v
在此流程中並未並行運行;它在同一計算線程上接一個地接收值1到10的值。
並行處理數字1到10的涉及:
Flowable . range ( 1 , 10 )
. flatMap ( v ->
Flowable . just ( v )
. subscribeOn ( Schedulers . computation ())
. map ( w -> w * w )
)
. blockingSubscribe ( System . out :: println );
實際上,rxjava中的並行性意味著運行獨立的流並將結果融合到單個流中。操作員flatMap
首先將每個數字從1到10映射到其自己的個人Flowable
中,運行它們並合併計算的正方形。
但是請注意, flatMap
不能保證任何訂單,並且內部流的項目可能最終會交錯。有其他操作員:
concatMap
,一次映射並運行一個內部流concatMapEager
運行所有內部流“一次”,但輸出流將按順序創建這些內部流。另外, Flowable.parallel()
操作員和ParallelFlowable
類型有助於實現相同的並行處理模式:
Flowable . range ( 1 , 10 )
. parallel ()
. runOn ( Schedulers . computation ())
. map ( v -> v * v )
. sequential ()
. blockingSubscribe ( System . out :: println );
flatMap
是一個強大的操作員,在許多情況下有助於。例如,給定的服務可以返回Flowable
服務,我們想撥打另一個服務,該服務具有第一個服務發出的值:
Flowable < Inventory > inventorySource = warehouse . getInventoryAsync ();
inventorySource
. flatMap ( inventoryItem -> erp . getDemandAsync ( inventoryItem . getId ())
. map ( demand -> "Item " + inventoryItem . getName () + " has demand " + demand ))
. subscribe ( System . out :: println );
有時,當項目可用時,人們希望對其進行一些依賴的計算。這有時稱為連續性,取決於應該發生的事情和涉及的類型,可能涉及各種操作員以實現。
最典型的情況是給出一個價值,調用另一項服務,等待其結果:
service . apiCall ()
. flatMap ( value -> service . anotherApiCall ( value ))
. flatMap ( next -> service . finalCall ( next ))
通常情況下,以後的序列也需要早期映射的值。這可以通過將外部flatMap
移動到上一個flatMap
的內部來實現:例如:
service . apiCall ()
. flatMap ( value ->
service . anotherApiCall ( value )
. flatMap ( next -> service . finalCallBoth ( value , next ))
)
在這裡,原始value
將在Inner flatMap
內提供,由Lambda變量捕獲提供。
在其他情況下,第一個源/數據流的結果是無關緊要的,並且希望繼續使用準獨立的另一個來源。在這裡, flatMap
也可以工作:
Observable continued = sourceObservable . flatMapSingle ( ignored -> someSingleSource )
continued . map ( v -> v . toString ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
但是,在這種情況下的延續是Observable
而不是可能更合適的Single
。 (這是可以理解的,因為從flatMapSingle
的角度來看, sourceObservable
是一個多價值源,因此映射也可能導致多個值)。
通常,有一種方法可以通過使用Completable
調解員及其操作員andThen
恢復其他內容,但有一種更具表現力(以及較低的開銷)的方法:
sourceObservable
. ignoreElements () // returns Completable
. andThen ( someSingleSource )
. map ( v -> v . toString ())
sourceObservable
和someSingleSource
之間的唯一依賴是,前者應該正常完成,以便消耗後者。
有時,上一個序列與新序列之間存在隱式數據依賴性,由於某種原因,這些序列沒有流過“常規通道”。一個人傾向於寫下以下延續:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . just ( count . get ()))
. subscribe ( System . out :: println );
不幸的是,此打印0
是因為Single.just(count.get())
在數據流尚未運行時會在彙編時間評估。我們需要一些對該Single
來源評估的東西,直到主要來源完成時運行時:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . defer (() -> Single . just ( count . get ())))
. subscribe ( System . out :: println );
或者
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . fromCallable (() -> count . get ()))
. subscribe ( System . out :: println );
有時,源或服務會返回與應與之合作的流動不同的類型。例如,在上面的庫存示例中, getDemandAsync
可以返回一個Single<DemandRecord>
。如果代碼示例保持不變,這將導致編譯時間錯誤(但是,通常會出現有關缺乏過載的誤導錯誤消息)。
在這種情況下,通常有兩個選擇來修復轉換:1)轉換為所需類型或2)查找並使用支持不同類型的特定操作員的超載。
每個反應性基類都有操作員可以執行此類轉換,包括協議轉換,以匹配其他類型。以下矩陣顯示可用的轉換選項:
可流動 | 可觀察 | 單身的 | 或許 | 可完整 | |
---|---|---|---|---|---|
可流動 | toObservable | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
可觀察 | toFlowable 2 | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
單身的 | toFlowable 3 | toObservable | toMaybe | ignoreElement | |
或許 | toFlowable 3 | toObservable | toSingle | ignoreElement | |
可完整 | toFlowable | toObservable | toSingle | toMaybe |
1 :將多價源變成單值源時,應確定應將許多源值視為結果中的哪一個。
2 :將Observable
到Flowable
需要額外決定:如何處理Observable
源流量的潛在流動?有幾種策略(例如通過BackpressureStrategy
參數進行緩衝,掉落,保留最新內容)或通過標準Flowable
操作員,例如onBackpressureBuffer
, onBackpressureDrop
, onBackpressureLatest
這也允許進一步定制背壓行為。
3 :當僅(最多)一個源項目時,背壓就沒有問題,因為它可以始終存儲,直到下游可以消耗為止。
許多經常使用的操作員的過載可以處理其他類型。這些通常以目標類型的後綴命名:
操作員 | 超載 |
---|---|
flatMap | flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap | concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap | switchMapSingle , switchMapMaybe , switchMapCompletable |
這些運算符具有後綴而不是簡單地具有不同簽名的同名的原因是類型擦除。 Java不考慮簽名,例如operator(Function<T, Single<R>>)
和operator(Function<T, Maybe<R>>)
不同(與C#不同),並且由於擦除,兩個operator
員最終會出現作為具有相同簽名的重複方法。
命名編程是最困難的事情之一,因為預計名稱不會長,表現力,捕捉且易於難忘。不幸的是,目標語言(和預先存在的慣例)在這方面可能不會給出太多幫助(無法使用的關鍵字,類型擦除,類型的歧義等)。
在原始的rx.net中,發出單個項目然後完成的操作員稱為Return(T)
。由於Java約定是要有一個小寫字母啟動方法名稱,因此這將是return(T)
這是Java中的關鍵字,因此無法使用。因此,rxjava選擇just(T)
。操作員Switch
也存在相同的限制,該開關必須命名為switchOnNext
。另一個例子是Catch
,它被命名為onErrorResumeNext
。
許多期望用戶提供一些返回反應類型的功能的操作員無法超載,因為圍繞Function<T, X>
將這種方法簽名的類型擦除變成了重複。 RXJAVA選擇通過將類型附加為後綴來命名此類操作員:
Flowable < R > flatMap ( Function <? super T , ? extends Publisher <? extends R >> mapper )
Flowable < R > flatMapMaybe ( Function <? super T , ? extends MaybeSource <? extends R >> mapper )
即使某些操作員在擦除類型中沒有問題,但他們的簽名可能含糊不清,尤其是如果有人使用Java 8和Lambdas。例如,將各種其他反應性基礎類型作為參數(為了在基礎實施中提供便利性和績效益處),有幾種concatWith
的過載:
Flowable < T > concatWith ( Publisher <? extends T > other );
Flowable < T > concatWith ( SingleSource <? extends T > other );
Publisher
和SingleSource
均以功能接口(具有一種抽象方法的類型)出現,並可能鼓勵用戶嘗試提供lambda表達式:
someSource . concatWith ( s -> Single . just ( 2 ))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
不幸的是,此方法不起作用,示例根本不打印2
。實際上,由於版本2.1.10,它甚至沒有編譯,因為至少有4個concatWith
過載,並且編譯器會發現上面的代碼。
在這種情況下的用戶可能希望推遲一些計算,直到someSource
完成完成為止,因此,正確的明確操作員應該已defer
:
someSource . concatWith ( Single . defer (() -> Single . just ( 2 )))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
有時,添加後綴以避免邏輯歧義可能會編譯但在流中產生錯誤的類型:
Flowable < T > merge ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > mergeArray ( Publisher <? extends T >... sources );
當功能接口類型作為類型參數T
涉及時,這也可能會變得模棱兩可。
數據流可能會失敗,此時錯誤將發出給消費者。但是,有時候,多個來源可能會失敗,在這一點上,可以選擇是否等待所有這些來源完成或失敗。為了表明這一機會,許多操作員名稱都帶有DelayError
單詞的後綴(而其他運營商的名稱則具有delayError
或delayErrors
boolean flag,其中一個過載):
Flowable < T > concat ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > concatDelayError ( Publisher <? extends Publisher <? extends T >> sources );
當然,各種後綴可能會一起出現:
Flowable < T > concatArrayEagerDelayError ( Publisher <? extends T >... sources );
由於上面有大量的靜態和實例方法,因此可以將基類認為很重。 RXJAVA 3的設計受反應流規範的嚴重影響,因此,庫具有每種反應類型的類和接口:
類型 | 班級 | 介面 | 消費者 |
---|---|---|---|
0..n背壓 | Flowable | Publisher 1 | Subscriber |
0..n無限 | Observable | ObservableSource 2 | Observer |
1個元素或錯誤 | Single | SingleSource | SingleObserver |
0..1元素或錯誤 | Maybe | MaybeSource | MaybeObserver |
0元素或錯誤 | Completable | CompletableSource | CompletableObserver |
1 org.reactivestreams.Publisher
是外部反應流庫的一部分。它是通過由反應流規範控制的標準化機制與其他反應性庫進行交互的主要類型。
2接口的命名約定是將Source
附加到半傳統類名稱中。沒有FlowableSource
,因為Publisher
是由反應流庫提供的(並且它也無法幫助您進行互操作的幫助)。但是,從反應流規範的意義上講,這些界面不是標準的,目前僅針對RXJAVA。
默認情況下,rxjava本身不需要任何proguard/r8設置,並且應該毫無問題地工作。不幸的是,自版本1.0.3以來,反應流的依賴性已將Java 9類文件嵌入其JAR中,該文件可能會引起普通Proguard的警告:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
建議在應用程序的proguard-ruleset
文件中設置以下-dontwarn
條目:
-dontwarn java.util.concurrent.Flow*
對於R8,rxjava jar包括具有相同非警告子句的META-INF/proguard/rxjava3.pro
,應自動應用。
有關更多詳細信息,請諮詢Wiki。
3.X版正在開發。 BugFix將應用於2.X和3.X分支,但新功能僅添加到3.x。
次要3.x增量(例如3.1、3.2等)將在添加非平凡的新功能或發生重大增強或錯誤修復時發生的行為變化可能會影響某些邊緣情況(例如依賴於從一個錯誤)。一個增強功能的示例將對這為以前不支持它的操作員添加反應性拉力支持。這應該是向後兼容的,但行為的行為也有所不同。
補丁3.xy增量(例如3.0.0-> 3.0.1,3.3.1-> 3.3.2等)將發生在錯誤修復和瑣碎功能(例如添加方法過載)。還可以在補丁發布中添加用@Beta
或@Experimental
註釋標記的新功能,以允許快速探索和迭代不穩定的新功能。
在類或方法級別上標有@Beta
註釋的API可能會更改。可以隨時以任何方式修改它們。如果您的代碼本身是庫本身(即它在控制外用戶的類Path上使用),則不應使用Beta API,除非您重新包裝它們(例如使用Proguard,Shading等)。
在班級或方法級別上標有@Experimental
註釋的API幾乎可以肯定會改變。可以隨時以任何方式修改它們。您不應在任何生產代碼中使用或依靠它們。它們純粹是為了進行廣泛的測試和反饋。
在類或方法級別上標有@Deprecated
註釋的API將一直支持直到下一個主要版本,但建議停止使用它們。
io.reactivex.rxjava3.internal.*
軟件包被視為私有API,完全不應依靠。它可以隨時改變。
http://reactivex.io/RxJava/3.x/javadoc/3.xy/
可以在http://search.maven.org上找到Maven,Ivy,Gradle和其他人的二進制信息和依賴信息。
Gradle的示例:
implementation ' io.reactivex.rxjava3:rxjava:x.y.z '
而對於馬文:
< dependency >
< groupId >io.reactivex.rxjava3</ groupId >
< artifactId >rxjava</ artifactId >
< version >x.y.z</ version >
</ dependency >
對於常春藤:
< dependency org = " io.reactivex.rxjava3 " name = " rxjava " rev = " x.y.z " />
5月1日之後的快照,2021年可通過https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
repositories {
maven { url ' https://oss.sonatype.org/content/repositories/snapshots ' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT '
}
可在http://reactivex.io/rxjava/3.x/javadoc/snapshot上找到Javadoc快照
構建:
$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
可以在Wiki的入門頁面上找到有關建築物的更多詳細信息。
有關錯誤,問題和討論,請使用GitHub問題。
Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.