RXJAVA是反应性扩展的Java VM实现:用于通过使用可观察的序列来组成异步和基于事件的程序的库。
它扩展了观察者模式以支持数据/事件的序列,并添加了运算符,使您可以声明地将序列组合在一起,同时抽象对低级线程,同步,线程安全和并发数据结构等事物的担忧。
在Wiki Home上了解有关RXJAVA的更多信息。
请阅读3.0中不同的内容,以获取从2.x升级时的更改和迁移信息的详细信息。
截至2021年2月28日, 2.x版本是寿命。不会发生进一步的开发,支持,维护,PR和更新。最后一个版本的Javadoc 2.2.21将保持可访问。
截至2018年3月31日, 1.x版本是寿命。不会发生进一步的开发,支持,维护,PR和更新。最后一个版本1.3.8的Javadoc将保持可访问。
第一步是将RXJAVA 3包括在您的项目中,例如,作为Gradle编译依赖性:
implementation " io.reactivex.rxjava3:rxjava:3.x.y "
(请用最新版本编号替换x
和y
:)
第二个是编写Hello World节目:
package rxjava . examples ;
import io . reactivex . rxjava3 . core .*;
public class HelloWorld {
public static void main ( String [] args ) {
Flowable . just ( "Hello world" ). subscribe ( System . out :: println );
}
}
请注意,RXJAVA 3组件现在生活在io.reactivex.rxjava3
下,基础类别和接口生活在io.reactivex.rxjava3.core
下。
RXJAVA 3具有几个可以在:
io.reactivex.rxjava3.core.Flowable
:0..n流动,支持反向流和背压io.reactivex.rxjava3.core.Observable
:0..n流,没有背压,io.reactivex.rxjava3.core.Single
:恰好1项或错误的流程,io.reactivex.rxjava3.core.Completable
:一个没有项目的流量,但仅一个完成或错误信号,io.reactivex.rxjava3.core.Maybe
:一个没有项目的流程,恰好一个项目或错误。RXJAVA中的数据流由一个源,零或更多中间步骤组成,然后是数据消费者或组合步骤(在其中责任通过某些方式消耗数据流):
source . operator1 (). operator2 (). operator3 (). subscribe ( consumer );
source . flatMap ( value -> source . operator1 (). operator2 (). operator3 ());
在这里,如果我们想象自己在operator2
上,向左望向源被称为上游。向右寻找订阅者/消费者的右侧称为下游。当每个元素都写在单独的行上时,这通常更为明显:
source
. operator1 ()
. operator2 ()
. operator3 ()
. subscribe ( consumer )
在RXJAVA的文档中,排放,发射,项目,事件,信号,数据和消息被视为同义词,代表沿数据流传播的对象。
当数据流通过异步步骤运行时,每个步骤都可能以不同的速度执行不同的事情。为了避免压倒性的步骤,通常会出现由于暂时缓冲或需要跳过/删除数据而增加的内存使用情况,应用了所谓的背压,这是流程控制的一种形式,这些步骤可以在其中表达多少个项目他们准备好处理了吗?这允许在通常没有办法了解上游将发送多少个项目的情况下,限制数据流的内存使用情况。
在rxjava中,专用Flowable
类被指定为支持背压, Observable
到的非封闭式操作(简短序列,GUI相互作用等)。其他类型的Single
, Maybe
和Completable
不支持背压,也不应;总是有临时存储一件物品的空间。
通过应用各种中间运算符来制备数据流程在所谓的组装时间内发生:
Flowable < Integer > flow = Flowable . range ( 1 , 5 )
. map ( v -> v * v )
. filter ( v -> v % 3 == 0 )
;
在这一点上,数据还没有流动,也没有发生副作用。
当调用subscribe()
流动以建立内部处理步骤链的流程时,这是一个临时状态:
flow . subscribe ( System . out :: println )
这是触发订阅副作用的时候(请参阅doOnSubscribe
)。一些来源在此状态下立即阻止或开始发射项目。
这是流量积极发射项目,错误或完成信号的状态:
Observable . create ( emitter -> {
while (! emitter . isDisposed ()) {
long time = System . currentTimeMillis ();
emitter . onNext ( time );
if ( time % 2 != 0 ) {
emitter . onError ( new IllegalStateException ( "Odd millisecond!" ));
break ;
}
}
})
. subscribe ( System . out :: println , Throwable :: printStackTrace );
实际上,这是上面给定示例的主体执行的时候。
RXJAVA的常见用例之一是在背景线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误):
import io . reactivex . rxjava3 . schedulers . Schedulers ;
Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
})
. subscribeOn ( Schedulers . io ())
. observeOn ( Schedulers . single ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 ); // <--- wait for the flow to finish
这种链式方法的样式称为流利的API ,类似于构建器图案。但是,rxjava的反应性类型是不变的。每个方法调用都会返回一个具有添加行为的新Flowable
。为了说明,可以按以下方式重写该示例:
Flowable < String > source = Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
});
Flowable < String > runBackground = source . subscribeOn ( Schedulers . io ());
Flowable < String > showForeground = runBackground . observeOn ( Schedulers . single ());
showForeground . subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 );
通常,您可以通过subscribeOn
将计算或阻止IO到其他线程。一旦数据准备就绪,您可以通过observeOn
确保它们在前景或GUI线程上处理。
RXJAVA运算符不直接与Thread
S或ExecutorService
服务合作,而是使用所谓的Scheduler
S抽象均匀API背后的并发源。 RXJAVA 3具有可通过Schedulers
实用程序类访问的几个标准调度程序。
Schedulers.computation()
:在背景中的固定数量的专用线程上运行计算密集型工作。大多数异步运算符将其用作其默认Scheduler
。Schedulers.io()
:在动态更改的线程集上运行I/Olike或阻止操作。Schedulers.single()
:以顺序和FIFO方式在单个线程上运行工作。Schedulers.trampoline()
:通常用于测试目的,以一个参与线程之一以连续的方式进行工作。这些都可以在所有JVM平台上找到,但是某些特定平台(例如Android)都有自己的典型Scheduler
定义: AndroidSchedulers.mainThread()
, SwingScheduler.instance()
或JavaFXScheduler.platform()
。
此外,还有一个选择将现有的Executor
(及其子类型(例如ExecutorService
Schedulers.from(Executor)
Scheduler
中。例如,可以使用这可以使用较大但仍然固定的线程池(分别与computation()
和io()
不同)。
Thread.sleep(2000);
最后不是偶然的。在rxjava中,默认Scheduler
在守护程序线程上运行,这意味着一旦Java主线程退出,它们都将停止,而背景计算可能永远不会发生。在此示例情况下睡一段时间,可以让您看到节流在控制台上的输出,并有时间备用。
rxjava中的流程在自然界中是连续的,分为可以彼此同时运行的处理阶段:
Flowable . range ( 1 , 10 )
. observeOn ( Schedulers . computation ())
. map ( v -> v * v )
. blockingSubscribe ( System . out :: println );
此示例流将数字从计算Scheduler
中的1到10平方,并在“主”线程上消耗结果(更准确地说,是blockingSubscribe
的呼叫者线程)。但是,lambda v -> v * v
在此流程中并未并行运行;它在同一计算线程上接一个地接收值1到10的值。
并行处理数字1到10的涉及:
Flowable . range ( 1 , 10 )
. flatMap ( v ->
Flowable . just ( v )
. subscribeOn ( Schedulers . computation ())
. map ( w -> w * w )
)
. blockingSubscribe ( System . out :: println );
实际上,rxjava中的并行性意味着运行独立的流并将结果融合到单个流中。操作员flatMap
首先将每个数字从1到10映射到其自己的个人Flowable
中,运行它们并合并计算的正方形。
但是请注意, flatMap
不能保证任何订单,并且内部流的项目可能最终会交错。有其他操作员:
concatMap
,一次映射并运行一个内部流concatMapEager
运行所有内部流“一次”,但输出流将按顺序创建这些内部流。另外, Flowable.parallel()
操作员和ParallelFlowable
类型有助于实现相同的并行处理模式:
Flowable . range ( 1 , 10 )
. parallel ()
. runOn ( Schedulers . computation ())
. map ( v -> v * v )
. sequential ()
. blockingSubscribe ( System . out :: println );
flatMap
是一个强大的操作员,在许多情况下有助于。例如,给定的服务可以返回Flowable
服务,我们想拨打另一个服务,该服务具有第一个服务发出的值:
Flowable < Inventory > inventorySource = warehouse . getInventoryAsync ();
inventorySource
. flatMap ( inventoryItem -> erp . getDemandAsync ( inventoryItem . getId ())
. map ( demand -> "Item " + inventoryItem . getName () + " has demand " + demand ))
. subscribe ( System . out :: println );
有时,当项目可用时,人们希望对其进行一些依赖的计算。这有时称为连续性,取决于应该发生的事情和涉及的类型,可能涉及各种操作员以实现。
最典型的情况是给出一个价值,调用另一项服务,等待其结果:
service . apiCall ()
. flatMap ( value -> service . anotherApiCall ( value ))
. flatMap ( next -> service . finalCall ( next ))
通常情况下,以后的序列也需要早期映射的值。这可以通过将外部flatMap
移动到上一个flatMap
的内部来实现:例如:
service . apiCall ()
. flatMap ( value ->
service . anotherApiCall ( value )
. flatMap ( next -> service . finalCallBoth ( value , next ))
)
在这里,原始value
将在Inner flatMap
内提供,由Lambda变量捕获提供。
在其他情况下,第一个源/数据流的结果是无关紧要的,并且希望继续使用准独立的另一个来源。在这里, flatMap
也可以工作:
Observable continued = sourceObservable . flatMapSingle ( ignored -> someSingleSource )
continued . map ( v -> v . toString ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
但是,在这种情况下的延续是Observable
而不是可能更合适的Single
。 (这是可以理解的,因为从flatMapSingle
的角度来看, sourceObservable
是一个多价值源,因此映射也可能导致多个值)。
通常,有一种方法可以通过使用Completable
调解员及其操作员andThen
恢复其他内容,但有一种更具表现力(以及较低的开销)的方法:
sourceObservable
. ignoreElements () // returns Completable
. andThen ( someSingleSource )
. map ( v -> v . toString ())
sourceObservable
和someSingleSource
之间的唯一依赖是,前者应该正常完成,以便消耗后者。
有时,上一个序列与新序列之间存在隐式数据依赖性,由于某种原因,这些序列没有流过“常规通道”。一个人倾向于写下以下延续:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . just ( count . get ()))
. subscribe ( System . out :: println );
不幸的是,此打印0
是因为Single.just(count.get())
在数据流尚未运行时会在汇编时间评估。我们需要一些对该Single
来源评估的东西,直到主要来源完成时运行时:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . defer (() -> Single . just ( count . get ())))
. subscribe ( System . out :: println );
或者
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . fromCallable (() -> count . get ()))
. subscribe ( System . out :: println );
有时,源或服务会返回与应与之合作的流动不同的类型。例如,在上面的库存示例中, getDemandAsync
可以返回一个Single<DemandRecord>
。如果代码示例保持不变,这将导致编译时间错误(但是,通常会出现有关缺乏过载的误导错误消息)。
在这种情况下,通常有两个选择来修复转换:1)转换为所需类型或2)查找并使用支持不同类型的特定操作员的超载。
每个反应性基类都有操作员可以执行此类转换,包括协议转换,以匹配其他类型。以下矩阵显示可用的转换选项:
可流动 | 可观察 | 单身的 | 或许 | 可完整 | |
---|---|---|---|---|---|
可流动 | toObservable | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
可观察 | toFlowable 2 | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
单身的 | toFlowable 3 | toObservable | toMaybe | ignoreElement | |
或许 | toFlowable 3 | toObservable | toSingle | ignoreElement | |
可完整 | toFlowable | toObservable | toSingle | toMaybe |
1 :将多价源变成单值源时,应确定应将许多源值视为结果中的哪一个。
2 :将Observable
到Flowable
需要额外决定:如何处理Observable
的源流量的潜在流动?有几种策略(例如通过BackpressureStrategy
参数进行缓冲,掉落,保留最新内容)或通过标准Flowable
操作员,例如onBackpressureBuffer
, onBackpressureDrop
, onBackpressureLatest
这也允许进一步定制背压行为。
3 :当仅(最多)一个源项目时,背压就没有问题,因为它可以始终存储,直到下游可以消耗为止。
许多经常使用的操作员的过载可以处理其他类型。这些通常以目标类型的后缀命名:
操作员 | 超载 |
---|---|
flatMap | flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap | concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap | switchMapSingle , switchMapMaybe , switchMapCompletable |
这些运算符具有后缀而不是简单地具有不同签名的同名的原因是类型擦除。 Java不考虑签名,例如operator(Function<T, Single<R>>)
和operator(Function<T, Maybe<R>>)
不同(与C#不同),并且由于擦除,两个operator
员最终会出现作为具有相同签名的重复方法。
命名编程是最困难的事情之一,因为预计名称不会长,表现力,捕捉且易于难忘。不幸的是,目标语言(和预先存在的惯例)在这方面可能不会给出太多帮助(无法使用的关键字,类型擦除,类型的歧义等)。
在原始的rx.net中,发出单个项目然后完成的操作员称为Return(T)
。由于Java约定是要有一个小写字母启动方法名称,因此这将是return(T)
这是Java中的关键字,因此无法使用。因此,rxjava选择just(T)
。操作员Switch
也存在相同的限制,该开关必须命名为switchOnNext
。另一个例子是Catch
,它被命名为onErrorResumeNext
。
许多期望用户提供一些返回反应类型的功能的操作员无法超载,因为围绕Function<T, X>
将这种方法签名的类型擦除变成了重复。 RXJAVA选择通过将类型附加为后缀来命名此类操作员:
Flowable < R > flatMap ( Function <? super T , ? extends Publisher <? extends R >> mapper )
Flowable < R > flatMapMaybe ( Function <? super T , ? extends MaybeSource <? extends R >> mapper )
即使某些操作员在擦除类型中没有问题,但他们的签名可能含糊不清,尤其是如果有人使用Java 8和Lambdas。例如,将各种其他反应性基础类型作为参数(为了在基础实施中提供便利性和绩效益处),有几种concatWith
的过载:
Flowable < T > concatWith ( Publisher <? extends T > other );
Flowable < T > concatWith ( SingleSource <? extends T > other );
Publisher
和SingleSource
均以功能接口(具有一种抽象方法的类型)出现,并可能鼓励用户尝试提供lambda表达式:
someSource . concatWith ( s -> Single . just ( 2 ))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
不幸的是,此方法不起作用,示例根本不打印2
。实际上,由于版本2.1.10,它甚至没有编译,因为至少有4个concatWith
过载,并且编译器会发现上面的代码。
在这种情况下的用户可能希望推迟一些计算,直到someSource
完成完成为止,因此,正确的明确操作员应该已defer
:
someSource . concatWith ( Single . defer (() -> Single . just ( 2 )))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
有时,添加后缀以避免逻辑歧义可能会编译但在流中产生错误的类型:
Flowable < T > merge ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > mergeArray ( Publisher <? extends T >... sources );
当功能接口类型作为类型参数T
涉及时,这也可能会变得模棱两可。
数据流可能会失败,此时错误将发出给消费者。但是,有时候,多个来源可能会失败,在这一点上,可以选择是否等待所有这些来源完成或失败。为了表明这一机会,许多操作员名称都带有DelayError
单词的后缀(而其他运营商的名称则具有delayError
或delayErrors
boolean flag,其中一个过载):
Flowable < T > concat ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > concatDelayError ( Publisher <? extends Publisher <? extends T >> sources );
当然,各种后缀可能会一起出现:
Flowable < T > concatArrayEagerDelayError ( Publisher <? extends T >... sources );
由于上面有大量的静态和实例方法,因此可以将基类认为很重。 RXJAVA 3的设计受反应流规范的严重影响,因此,库具有每种反应类型的类和接口:
类型 | 班级 | 界面 | 消费者 |
---|---|---|---|
0..n背压 | Flowable | Publisher 1 | Subscriber |
0..n无限 | Observable | ObservableSource 2 | Observer |
1个元素或错误 | Single | SingleSource | SingleObserver |
0..1元素或错误 | Maybe | MaybeSource | MaybeObserver |
0元素或错误 | Completable | CompletableSource | CompletableObserver |
1 org.reactivestreams.Publisher
是外部反应流库的一部分。它是通过由反应流规范控制的标准化机制与其他反应性库进行交互的主要类型。
2接口的命名约定是将Source
附加到半传统类名称中。没有FlowableSource
,因为Publisher
是由反应流库提供的(并且它也无法帮助您进行互操作的帮助)。但是,从反应流规范的意义上讲,这些界面不是标准的,目前仅针对RXJAVA。
默认情况下,rxjava本身不需要任何proguard/r8设置,并且应该毫无问题地工作。不幸的是,自版本1.0.3以来,反应流的依赖性已将Java 9类文件嵌入其JAR中,该文件可能会引起普通Proguard的警告:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
建议在应用程序的proguard-ruleset
文件中设置以下-dontwarn
条目:
-dontwarn java.util.concurrent.Flow*
对于R8,rxjava jar包括具有相同非警告子句的META-INF/proguard/rxjava3.pro
,应自动应用。
有关更多详细信息,请咨询Wiki。
3.X版正在开发。 BugFix将应用于2.X和3.X分支,但新功能仅添加到3.x。
次要3.x增量(例如3.1、3.2等)将在添加非平凡的新功能或发生重大增强或错误修复时发生的行为变化可能会影响某些边缘情况(例如依赖于从一个错误)。一个增强功能的示例将对这为以前不支持它的操作员添加反应性拉力支持。这应该是向后兼容的,但行为的行为也有所不同。
补丁3.xy增量(例如3.0.0-> 3.0.1,3.3.1-> 3.3.2等)将发生在错误修复和琐碎功能(例如添加方法过载)。还可以在补丁发布中添加用@Beta
或@Experimental
注释标记的新功能,以允许快速探索和迭代不稳定的新功能。
在类或方法级别上标有@Beta
注释的API可能会更改。可以随时以任何方式修改它们。如果您的代码本身是库本身(即它在控制外用户的类Path上使用),则不应使用Beta API,除非您重新包装它们(例如使用Proguard,Shading等)。
在班级或方法级别上标有@Experimental
注释的API几乎可以肯定会改变。可以随时以任何方式修改它们。您不应在任何生产代码中使用或依靠它们。它们纯粹是为了进行广泛的测试和反馈。
在类或方法级别上标有@Deprecated
注释的API将一直支持直到下一个主要版本,但建议停止使用它们。
io.reactivex.rxjava3.internal.*
软件包被视为私有API,完全不应依靠。它可以随时改变。
http://reactivex.io/RxJava/3.x/javadoc/3.xy/
可以在http://search.maven.org上找到Maven,Ivy,Gradle和其他人的二进制信息和依赖信息。
Gradle的示例:
implementation ' io.reactivex.rxjava3:rxjava:x.y.z '
而对于马文:
< dependency >
< groupId >io.reactivex.rxjava3</ groupId >
< artifactId >rxjava</ artifactId >
< version >x.y.z</ version >
</ dependency >
对于常春藤:
< dependency org = " io.reactivex.rxjava3 " name = " rxjava " rev = " x.y.z " />
5月1日之后的快照,2021年可通过https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
repositories {
maven { url ' https://oss.sonatype.org/content/repositories/snapshots ' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT '
}
可在http://reactivex.io/rxjava/3.x/javadoc/snapshot上找到Javadoc快照
构建:
$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
可以在Wiki的入门页面上找到有关建筑物的更多详细信息。
有关错误,问题和讨论,请使用GitHub问题。
Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.