Rxjava ist eine Java-VM-Implementierung von reaktiven Erweiterungen: eine Bibliothek zum Komponieren von asynchronen und ereignisbasierten Programmen unter Verwendung beobachtbarer Sequenzen.
Es erweitert das Beobachtermuster, um Sequenzen von Daten/Ereignissen zu unterstützen, und fügt den Operatoren hinzu, die es Ihnen ermöglichen, Sequenzen deklarativ zusammenzustellen, während Sie Bedenken hinsichtlich von Threading, Synchronisation, Thread-Sicherheit und gleichzeitigen Datenstrukturen abstrahieren.
Erfahren Sie mehr über Rxjava im Allgemeinen im Wiki -Haus.
Bitte lesen Sie das What Different in 3.0 für Einzelheiten zu den Änderungen und Migrationsinformationen beim Upgrade von 2.x.
Die 2.x-Version ist zum 28. Februar 2021 am Ende des Lebens. Es werden keine Weiterentwicklung, Unterstützung, Wartung, PRS und Updates stattfinden. Der Javadoc der allerletzten Version 2.2.21 bleibt zugänglich.
Die 1.x-Version ist zum 31. März 2018 am Ende des Lebens. Es werden keine Weiterentwicklung, Unterstützung, Wartung, PRS und Updates stattfinden. Der Javadoc der allerletzten Version 1.3.8 bleibt zugänglich.
Der erste Schritt besteht darin, RXJAVA 3 in Ihr Projekt einzubeziehen, beispielsweise als Abhängigkeit von Gradle Compile:
implementation " io.reactivex.rxjava3:rxjava:3.x.y "
(Bitte ersetzen Sie x
und y
durch die neuesten Versionsnummern :)
Die zweite besteht darin, das Hello World -Programm zu schreiben:
package rxjava . examples ;
import io . reactivex . rxjava3 . core .*;
public class HelloWorld {
public static void main ( String [] args ) {
Flowable . just ( "Hello world" ). subscribe ( System . out :: println );
}
}
Beachten Sie, dass rxjava 3 Komponenten jetzt unter io.reactivex.rxjava3
und den Basisklassen und Schnittstellen unter io.reactivex.rxjava3.core
leben.
RXJAVA 3 verfügt über mehrere Basisklassen, in denen Sie Operatoren entdecken können:
io.reactivex.rxjava3.core.Flowable
: 0..n fließio.reactivex.rxjava3.core.Observable
: 0..n Flows, kein Backdruck,io.reactivex.rxjava3.core.Single
: Ein Fluss von genau 1 Element oder einem Fehler,io.reactivex.rxjava3.core.Completable
: Ein Fluss ohne Elemente, aber nur ein Abschluss oder ein Fehlersignal,io.reactivex.rxjava3.core.Maybe
: Ein Fluss ohne Elemente, genau ein Element oder einen Fehler.Die Datenflows in Rxjava bestehen aus einer Quelle, einer Null- oder mehr Zwischenschritte, gefolgt von einem Datenverbraucher- oder Kombinatorschritt (wobei der Schritt für den Konsum des Datenflusss auf einige Weise verantwortlich ist):
source . operator1 (). operator2 (). operator3 (). subscribe ( consumer );
source . flatMap ( value -> source . operator1 (). operator2 (). operator3 ());
Wenn wir uns hier auf operator2
vorstellen, wird es stromaufwärts genannt, nach links nach nach links zu schauen. Der Blick nach rechts in Richtung Abonnent/Verbraucher wird nachgeschaltet bezeichnet. Dies ist oft deutlicher, wenn jedes Element in einer separaten Zeile geschrieben wird:
source
. operator1 ()
. operator2 ()
. operator3 ()
. subscribe ( consumer )
In der Dokumentation von RXJAVA werden Emission , Emission , Element , Ereignis , Signal , Daten und Nachricht als Synonyme angesehen und repräsentieren das Objekt, das entlang des Datenflusss fährt.
Wenn der DataFlow durch asynchrone Schritte ausgeführt wird, kann jeder Schritt unterschiedliche Dinge mit unterschiedlicher Geschwindigkeit ausführen. Um solche Schritte zu vermeiden, die sich normalerweise als erhöhte Speicherverwendung aufgrund der vorübergehenden Pufferung oder der Notwendigkeit des Überspringens/Löschens von Daten manifestieren, wird der sogenannte Rückdruck angewendet, was eine Form der Flussregelung ist, bei der die Schritte ausdrücken können, wie viele Elemente ausdrücken können, wie viele Elemente ausdrücken können Sind sie bereit zu verarbeiten? Dies ermöglicht die Einschränkung der Speicherverwendung der Datenflows in Situationen, in denen es im Allgemeinen keine Möglichkeit gibt, einen Schritt zu wissen, wie viele Elemente der Upstream an ihn senden.
In Rxjava ist die dedizierte Flowable
Klasse so ausgewiesen, dass sie Backdruck unterstützt, und Observable
ist den nicht zurückgeschwungenen Operationen (kurze Sequenzen, GUI-Interaktionen usw.) gewidmet. Die anderen Typen, Single
, Maybe
und Completable
unterstützen weder Backdruck noch sollten sie; Es gibt immer Platz, um einen Artikel vorübergehend aufzubewahren.
Die Erstellung von Datenflows durch Anwenden verschiedener Zwischenbetreiber erfolgt in der sogenannten Montagezeit :
Flowable < Integer > flow = Flowable . range ( 1 , 5 )
. map ( v -> v * v )
. filter ( v -> v % 3 == 0 )
;
Zu diesem Zeitpunkt fließen die Daten noch nicht und es treten keine Nebenwirkungen auf.
Dies ist ein temporärer Zustand, wenn subscribe()
auf einen Fluss aufgerufen wird, der die interne Kette der Verarbeitungsschritte festlegt:
flow . subscribe ( System . out :: println )
Dies ist der Zeitpunkt, an dem die Abonnement-Nebenwirkungen ausgelöst werden (siehe doOnSubscribe
). Einige Quellen blockieren oder beginnen, Elemente sofort in diesem Zustand zu emittieren.
Dies ist der Zustand, in dem die Strömungen aktiv Elemente, Fehler oder Fertigstellungssignale ausgeben:
Observable . create ( emitter -> {
while (! emitter . isDisposed ()) {
long time = System . currentTimeMillis ();
emitter . onNext ( time );
if ( time % 2 != 0 ) {
emitter . onError ( new IllegalStateException ( "Odd millisecond!" ));
break ;
}
}
})
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Dies ist dann der Fall, in dem der Körper des angegebenen Beispiels ausgeführt wird.
Einer der gängigen Anwendungsfälle für Rxjava ist die Ausführung einer Berechnung, die Netzwerkanforderung in einem Hintergrund -Thread und die Anzeige der Ergebnisse (oder Fehler) im UI -Thread:
import io . reactivex . rxjava3 . schedulers . Schedulers ;
Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
})
. subscribeOn ( Schedulers . io ())
. observeOn ( Schedulers . single ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 ); // <--- wait for the flow to finish
Diese Art von Kettenmethoden wird als fließende API bezeichnet, die dem Builder -Muster ähnelt. Die reaktiven Typen von Rxjava sind jedoch unveränderlich; Jede der Methodenaufrufe gibt ein neues Flowable
mit zusätzlichem Verhalten zurück. Zur Veranschaulichung kann das Beispiel wie folgt umgeschrieben werden:
Flowable < String > source = Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
});
Flowable < String > runBackground = source . subscribeOn ( Schedulers . io ());
Flowable < String > showForeground = runBackground . observeOn ( Schedulers . single ());
showForeground . subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 );
Normalerweise können Sie Berechnungen oder Blockieren von IO über subscribeOn
in einen anderen Thread verschieben. Sobald die Daten fertig sind, können Sie sicherstellen, dass sie im Vordergrund oder GUI -Thread über observeOn
verarbeitet werden.
RXJAVA-Betreiber arbeiten nicht direkt mit Thread
oder ExecutorService
S, sondern mit sogenannten Scheduler
, die hinter einer einheitlichen API abstrahieren. Rxjava 3 verfügt über mehrere Standard -Scheduler, die über Schedulers
Utility -Klasse zugänglich sind.
Schedulers.computation()
: Rechenintensive Arbeiten auf einer festen Anzahl dedizierter Threads im Hintergrund ausführen. Die meisten asynchronen Operatoren verwenden dies als Scheduler
.Schedulers.io()
: I/O-like oder blockierende Vorgänge auf einem dynamisch ändernden Thread-Satz ausführen.Schedulers.single()
: Arbeiten Sie auf sequentielle und fifal -Weise auf einem einzelnen Thread aus.Schedulers.trampoline()
: Arbeiten Sie in einem der teilnehmenden Threads in der Regel auf sequentielle und fIFO -Weise aus, normalerweise zu Testzwecken. Diese sind auf allen JVM -Plattformen erhältlich, aber einige spezifische Plattformen wie Android haben ihre eigenen typischen Scheduler
definiert: AndroidSchedulers.mainThread()
, SwingScheduler.instance()
oder JavaFXScheduler.platform()
.
Darüber hinaus besteht die Möglichkeit, einen vorhandenen Executor
(und seine Subtypen wie ExecutorService
) über Schedulers.from(Executor)
in einen Scheduler
einzuwickeln. Dies kann zum Beispiel verwendet werden, um einen größeren, aber immer noch festen Pool von Threads (im Gegensatz zu computation()
bzw. io()
) zu haben.
Der Thread.sleep(2000);
Am Ende ist kein Zufall. In RXJAVA Der Standard Scheduler
-Auslauf auf Daemon -Threads, sobald der Java -Haupt -Thread beendet wird, werden alle gestoppt und Hintergrundberechnungen werden möglicherweise nie stattfinden. Wenn Sie für einige Zeit in diesen Beispielsituationen schlafen, können Sie die Ausgabe des Flusses auf der Konsole mit der Zeit sehen.
Flows in Rxjava sind in Verarbeitungsstadien aufeinanderfolgende Natur, die gleichzeitig miteinander laufen können:
Flowable . range ( 1 , 10 )
. observeOn ( Schedulers . computation ())
. map ( v -> v * v )
. blockingSubscribe ( System . out :: println );
In diesem Beispiel wird die Zahlen von 1 bis 10 auf Scheduler
aufgetreten und die Ergebnisse im "Haupt" -Thread (genauer gesagt den Anrufer -Thread von blockingSubscribe
) konsumiert. Die Lambda v -> v * v
läuft jedoch nicht parallel für diesen Fluss; Es empfängt die Werte 1 bis 10 auf demselben Berechnungs -Thread nacheinander.
Die Verarbeitung der Zahlen 1 bis 10 parallel ist etwas mehr involviert:
Flowable . range ( 1 , 10 )
. flatMap ( v ->
Flowable . just ( v )
. subscribeOn ( Schedulers . computation ())
. map ( w -> w * w )
)
. blockingSubscribe ( System . out :: println );
Praktisch bedeutet Parallelität bei Rxjava, unabhängige Strömungen zu betreiben und ihre Ergebnisse wieder in einen einzigen Fluss zu verschmelzen. Die Operator flatMap
führt dies durch, indem er zuerst jede Zahl von 1 bis 10 in ihren eigenen individuellen Flowable
macht, sie ausführt und die berechneten Quadrate verschmilzt.
Beachten Sie jedoch, dass flatMap
keine Bestellung garantiert und die Elemente aus den inneren Strömen möglicherweise verschachtelt sind. Es gibt alternative Operatoren:
concatMap
, die einen inneren Fluss jeweils kartiert und ausführtconcatMapEager
, das alle inneren Flüsse "gleichzeitig" ausführt, aber der Ausgangsfluss ist in der Reihenfolge, in der diese inneren Strömungen erzeugt wurden. Alternativ können der Flowable.parallel()
-Operator und der ParallelFlowable
-Typ das gleiche parallele Verarbeitungsmuster erreichen:
Flowable . range ( 1 , 10 )
. parallel ()
. runOn ( Schedulers . computation ())
. map ( v -> v * v )
. sequential ()
. blockingSubscribe ( System . out :: println );
flatMap
ist ein leistungsfähiger Bediener und hilft in vielen Situationen. Angesichts eines Dienstes, der ein Flowable
zurückgibt, möchten wir beispielsweise einen anderen Dienst mit Werten aufrufen, die vom ersten Dienst ausgestrahlt werden:
Flowable < Inventory > inventorySource = warehouse . getInventoryAsync ();
inventorySource
. flatMap ( inventoryItem -> erp . getDemandAsync ( inventoryItem . getId ())
. map ( demand -> "Item " + inventoryItem . getName () + " has demand " + demand ))
. subscribe ( System . out :: println );
Manchmal, wenn ein Artikel verfügbar geworden ist, möchte man einige abhängige Berechnungen darauf ausführen. Dies wird manchmal als Kontinuationen bezeichnet, je nachdem, was passieren sollte und welche Typen sich beteiligen, können verschiedene Betreiber einbeziehen.
Das typischste Szenario besteht darin, einen Wert zu geben, einen anderen Service aufzurufen, auf das Ergebnis zu warten und fortzusetzen:
service . apiCall ()
. flatMap ( value -> service . anotherApiCall ( value ))
. flatMap ( next -> service . finalCall ( next ))
Es ist häufig auch so, dass spätere Sequenzen Werte aus früheren Zuordnungen erfordern. Dies kann erreicht werden, indem die äußere flatMap
beispielsweise in die inneren Teile der vorherigen flatMap
bewegt wird:
service . apiCall ()
. flatMap ( value ->
service . anotherApiCall ( value )
. flatMap ( next -> service . finalCallBoth ( value , next ))
)
Hier wird der ursprüngliche value
in der inneren flatMap
mit freundlicher Genehmigung von Lambda Variable Capture erhältlich sein.
In anderen Szenarien sind die Ergebnisse (n) der ersten Quelle/des ersten Datenflows irrelevant und man möchte mit einer quasi unabhängigen anderen Quelle fortsetzen. Hier funktioniert auch flatMap
:
Observable continued = sourceObservable . flatMapSingle ( ignored -> someSingleSource )
continued . map ( v -> v . toString ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Die Fortsetzung in diesem Fall bleibt jedoch anstelle der wahrscheinlich geeigneteren Single
Observable
. (Dies ist verständlich, da aus der Perspektive von flatMapSingle
sourceObservable
eine mehrwertige Quelle ist und somit auch die Zuordnung auch zu mehreren Werten führen kann).
Oft gibt es einen Weg, der etwas ausdrucksvoller (und auch niedrigerer Overhead) ist, indem sie als Vermittler und seinen Betreiber Completable
werden andThen
mit etwas anderem wieder aufgenommen werden:
sourceObservable
. ignoreElements () // returns Completable
. andThen ( someSingleSource )
. map ( v -> v . toString ())
Die einzige Abhängigkeit zwischen dem sourceObservable
und dem someSingleSource
ist, dass der erstere normalerweise vervollständigen sollte, damit die letzteren konsumiert werden.
Manchmal gibt es eine implizite Datenabhängigkeit zwischen der vorherigen Sequenz und der neuen Sequenz, die aus irgendeinem Grund nicht durch die "regulären Kanäle" fließt. Man würde geneigt sein, solche Kontinuationen wie folgt zu schreiben:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . just ( count . get ()))
. subscribe ( System . out :: println );
Leider druckt dies 0
, weil Single.just(count.get())
zum Montagezeit bewertet wird, wenn der Datenfluss noch nicht einmal ausgeführt wurde. Wir brauchen etwas, das die Bewertung dieser Single
Quelle bis zur Laufzeit vertieft, wenn die Hauptquelle abgeschlossen ist:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . defer (() -> Single . just ( count . get ())))
. subscribe ( System . out :: println );
oder
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . fromCallable (() -> count . get ()))
. subscribe ( System . out :: println );
Manchmal gibt eine Quelle oder ein Dienst einen anderen Typ zurück als den Fluss, der damit funktionieren soll. Zum Beispiel könnte im obigen Inventarbeispiel getDemandAsync
ein Single<DemandRecord>
zurückgeben. Wenn das Codebeispiel unverändert bleibt, führt dies zu einem Kompilierungs-Zeitfehler (häufig mit einer irreführenden Fehlermeldung über die mangelnde Überlastung).
In solchen Situationen gibt es normalerweise zwei Optionen, um die Transformation zu beheben: 1) Um den gewünschten Typ zu konvertieren oder 2) Finden und verwenden Sie eine Überlastung des spezifischen Bedieners, der den verschiedenen Typ unterstützt.
Jede reaktive Basisklasse verfügt über Operatoren, die solche Konvertierungen, einschließlich der Protokollkonvertierungen, ausführen können, um einen anderen Typ zu entsprechen. Die folgende Matrix zeigt die verfügbaren Conversion -Optionen:
Fließbar | Beobachtbar | Einzel | Vielleicht | Vervollständigbar | |
---|---|---|---|---|---|
Fließbar | toObservable | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
Beobachtbar | toFlowable 2 | first , firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
Einzel | toFlowable 3 | toObservable | toMaybe | ignoreElement | |
Vielleicht | toFlowable 3 | toObservable | toSingle | ignoreElement | |
Vervollständigbar | toFlowable | toObservable | toSingle | toMaybe |
1 : Wenn eine mehrwertige Quelle in eine einheitliche Quelle verwandelt, sollte man entscheiden, welche der vielen Quellwerte als Ergebnis betrachtet werden sollten.
2 : Ein Observable
in Flowable
Umdrehen erfordert eine zusätzliche Entscheidung: Was tun mit dem potenziellen nicht eingeschränkten Fluss der Quelle Observable
? Es gibt mehrere Strategien (z. B. Pufferung, Ablagerungen, Beibehalten der neuesten) über den Parameter BackpressureStrategy
oder über standardmäßige Flowable
Operatoren wie onBackpressureBuffer
, onBackpressureDrop
, onBackpressureLatest
das auch eine weitere Anpassung des Backdruckverhaltens ermöglicht.
3 : Wenn es nur (höchstens) ein Quellenelement gibt, gibt es kein Problem mit dem Rückdruck, da es immer gespeichert werden kann, bis der Downstream zu konsumieren ist.
Viele häufig verwendete Bediener haben Überladungen, die mit den anderen Typen umgehen können. Diese werden normalerweise mit dem Suffix des Zieltyps benannt:
Operator | Überlastungen |
---|---|
flatMap | flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap | concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap | switchMapSingle , switchMapMaybe , switchMapCompletable |
Der Grund, warum diese Operatoren ein Suffix haben, anstatt einfach denselben Namen mit unterschiedlicher Signatur zu haben, ist die Typ -Löschung. Java berücksichtigt keine Signaturen wie operator(Function<T, Single<R>>)
und operator(Function<T, Maybe<R>>)
unterschiedlich (im Gegensatz zu C#) und aufgrund des Löschens würden die beiden operator
S enden würden als doppelte Methoden mit der gleichen Signatur.
Die Benennung in der Programmierung ist eines der schwierigsten Dinge, da die Namen nicht lang, ausdrucksstark, erfasst und leicht unvergesslich sind. Leider leisten die Zielsprache (und bereits bestehende Konventionen) diesbezüglich möglicherweise nicht zu viel Hilfe (unbrauchbare Schlüsselwörter, Typ-Löschen, Typ-Unklarheiten usw.).
Im ursprünglichen RX.NET wird der Bediener, der ein einzelnes Element emittiert und dann abschließt, als Return(T)
bezeichnet. Da die Java -Konvention einen Kleinbuchstaben -Brief starten soll, wäre dies return(T)
ein Schlüsselwort in Java und somit nicht verfügbar. Daher hat sich Rxjava entschieden, diesen Operator just(T)
zu benennen. Die gleiche Einschränkung ist für den Switch
vorhanden, der als switchOnNext
bezeichnet werden musste. Ein weiteres Beispiel ist Catch
, der onErrorResumeNext
genannt wurde.
Viele Operatoren, die erwarten, dass der Benutzer eine Funktion zur Rückgabe eines reaktiven Typs zur Verfügung stellt, können nicht überladen werden, da das Typ -Löschen um eine Function<T, X>
verwandelt solche Methodensignaturen in Duplikate. RXJAVA hat sich entschieden, solche Betreiber zu benennen, indem sie den Typ auch als Suffix anhängen:
Flowable < R > flatMap ( Function <? super T , ? extends Publisher <? extends R >> mapper )
Flowable < R > flatMapMaybe ( Function <? super T , ? extends MaybeSource <? extends R >> mapper )
Obwohl bestimmte Betreiber keine Probleme haben, um die Typ -Löschung zu erteilen, kann ihre Unterschrift mehrdeutig auftreten, insbesondere wenn man Java 8 und Lambdas verwendet. Beispielsweise gibt es mehrere Überladungen von concatWith
, die die verschiedenen anderen reaktiven Basistypen als Argumente (zur Bereitstellung von Komfort- und Leistungsvorteilen in der zugrunde liegenden Implementierung) einnehmen:
Flowable < T > concatWith ( Publisher <? extends T > other );
Flowable < T > concatWith ( SingleSource <? extends T > other );
Sowohl Publisher
als auch SingleSource
erscheinen als funktionale Schnittstellen (Typen mit einer abstrakten Methode) und können Benutzer dazu ermutigen, einen Lambda -Ausdruck bereitzustellen:
someSource . concatWith ( s -> Single . just ( 2 ))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Leider funktioniert dieser Ansatz nicht und das Beispiel druckt 2
überhaupt nicht aus. Tatsächlich kompiliert es seit Version 2.1.10 nicht einmal, da mindestens 4 concatWith
vorhanden sind und der Compiler den Code über zweideutig findet.
Der Benutzer in solchen Situationen wollte wahrscheinlich eine Berechnung aufschieben, bis die someSource
abgeschlossen ist. Daher hätte der richtige eindeutige Operator defer
werden müssen:
someSource . concatWith ( Single . defer (() -> Single . just ( 2 )))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Manchmal wird ein Suffix hinzugefügt, um logische Unklarheiten zu vermeiden, die möglicherweise kompilieren, aber den falschen Typ in einem Fluss erzeugen:
Flowable < T > merge ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > mergeArray ( Publisher <? extends T >... sources );
Dies kann auch mehrdeutig werden, wenn sich funktionale Schnittstellentypen als Typ -Argument T
engagieren.
Datenflows können fehlschlagen, an diesem Punkt wird der Fehler an die Verbraucher (en) emittiert. Manchmal können jedoch mehrere Quellen scheitern, an welchem Punkt die Wahl besteht, ob alle sie abschließen oder scheitern können oder nicht. Um diese Gelegenheit anzuzeigen, sind viele Bedienernamen mit den DelayError
-Wörtern satt (während andere in einer ihrer Überladungen eine Boolesche Flagge delayError
oder delayErrors
aufweisen):
Flowable < T > concat ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > concatDelayError ( Publisher <? extends Publisher <? extends T >> sources );
Natürlich können Suffixe verschiedener Arten zusammen erscheinen:
Flowable < T > concatArrayEagerDelayError ( Publisher <? extends T >... sources );
Die Basisklassen können aufgrund der schiere Anzahl von statischen und Instanzmethoden als schwer angesehen werden. Das Design von Rxjava 3 wurde stark von der Spezifikation der reaktiven Streams beeinflusst. Daher enthält die Bibliothek eine Klasse und eine Schnittstelle pro reaktivem Typ:
Typ | Klasse | Schnittstelle | Verbraucher |
---|---|---|---|
0..N Backdruck | Flowable | Publisher 1 | Subscriber |
0..n unbegrenzt | Observable | ObservableSource 2 | Observer |
1 Element oder Fehler | Single | SingleSource | SingleObserver |
0..1 Element oder Fehler | Maybe | MaybeSource | MaybeObserver |
0 Element oder Fehler | Completable | CompletableSource | CompletableObserver |
1 Die org.reactivestreams.Publisher
Es ist der Haupttyp, um mit anderen reaktiven Bibliotheken durch einen standardisierten Mechanismus zu interagieren, der von der Spezifikation der reaktiven Ströme bestimmt wird.
2 Die Namenskonvention der Schnittstelle bestand darin, Source
an den semi-traditionellen Klassennamen anzuhängen. Es gibt keine FlowableSource
, da Publisher
von der Reactive Streams Library bereitgestellt wird (und dass sie auch bei der Interoperation nicht geholfen hätte). Diese Schnittstellen sind jedoch im Sinne der reaktiven Streams -Spezifikation nicht Standard und sind derzeit nur rxjava spezifisch.
Standardmäßig benötigt Rxjava selbst keine Einstellungen für Proguard/R8 und sollte ohne Probleme funktionieren. Leider hat die Abhängigkeit von Reaktiven seit Version 1.0.3 Java 9 -Klassendateien in das Glas eingebettet, die mit dem einfachen Proguard Warnungen hervorrufen können:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
Es wird empfohlen, den folgenden -dontwarn
in der proguard-ruleset
-Datei der Anwendung einzurichten:
-dontwarn java.util.concurrent.Flow*
Für R8 enthält das Rxjava-Glas die META-INF/proguard/rxjava3.pro
mit derselben No-Warn-Klausel und sollte automatisch angewendet werden.
Weitere Informationen erhalten Sie vom Wiki.
Version 3.x ist in der Entwicklung. Bugfixes werden sowohl auf 2.x- als auch auf 3.x -Zweige angewendet, aber neue Funktionen werden nur zu 3.x hinzugefügt.
Minor 3.x-Inkremente (z. B. 3.1, 3.2 usw.) treten auf, wenn nicht-triviale neue Funktionen hinzugefügt werden, oder erhebliche Verbesserungen oder Fehlerbehebungen, die Verhaltensänderungen aufweisen können, die einige Randfälle beeinflussen können (z. ein Fehler). Ein Beispiel für eine Verbesserung, die klassifiziert wird, da dies einem Bediener reaktiven Pull -Backdruck -Unterstützung hinzufügt, der es zuvor nicht unterstützte. Dies sollte rückwärtskompatibel sein, sich jedoch anders verhalten.
Patch 3.xy -Inkremente (z. B. 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2 usw.) tritt bei Fehlerbehebungen und trivialen Funktionen auf (z. B. Hinzufügen einer Methodenüberlastung). Neue Funktionalität, die mit einer @Beta
oder @Experimental
gekennzeichnet ist, kann auch in den Patch -Releases hinzugefügt werden, um eine schnelle Erforschung und Iteration instabiler neuer Funktionen zu ermöglichen.
APIs, die mit der @Beta
-Annotation auf der Klasse oder Methodenebene gekennzeichnet sind, können sich ändern. Sie können jederzeit in irgendeiner Weise oder sogar entfernt werden. Wenn Ihr Code selbst eine Bibliothek ist (dh er wird auf dem Klassenpfad von Benutzern außerhalb Ihrer Kontrolle verwendet), sollten Sie keine Beta -APIs verwenden, es sei denn, Sie können sie neu verpacken (z. B. mit Proguard, Shading usw.).
APIs, die mit der @Experimental
Annotation auf der Klasse oder Methodenebene gekennzeichnet sind, ändert sich mit ziemlicher Sicherheit. Sie können jederzeit in irgendeiner Weise oder sogar entfernt werden. Sie sollten sie in einem Produktionscode nicht verwenden oder auf sie verlassen. Sie dürfen nur breite Tests und Feedback zulassen.
APIs, die mit der @Deprecated
Annotation auf der Klasse oder der Methodenebene gekennzeichnet sind, bleibt bis zur nächsten größeren Veröffentlichung unterstützt. Es wird jedoch empfohlen, sie nicht mehr zu verwenden.
Alle Code in der io.reactivex.rxjava3.internal.*
Pakete werden als private API angesehen und sollten überhaupt nicht verlassen werden. Es kann sich jederzeit ändern.
http://reactivex.io/RxJava/3.x/javadoc/3.xy/
Binärdateien und Abhängigkeitsinformationen für Maven, Ivy, Gradle und andere finden Sie unter http://search.maven.org.
Beispiel für Gradle:
implementation ' io.reactivex.rxjava3:rxjava:x.y.z '
und für Maven:
< dependency >
< groupId >io.reactivex.rxjava3</ groupId >
< artifactId >rxjava</ artifactId >
< version >x.y.z</ version >
</ dependency >
und für Ivy:
< dependency org = " io.reactivex.rxjava3 " name = " rxjava " rev = " x.y.z " />
Snapshots nach dem 1. Mai 2021 sind über https://oss.sonatype.org/content/repoories/snapshots/io/reactivex/rxjava3/rxjava/ erhältlich
repositories {
maven { url ' https://oss.sonatype.org/content/repositories/snapshots ' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT '
}
Javadoc -Schnappschüsse sind unter http://reactivex.io/rxjava/3.x/javadoc/snapshot erhältlich
Zu bauen:
$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
Weitere Details zum Gebäude finden Sie auf der Seite "Erste Schritte" des Wiki.
Für Fehler, Fragen und Diskussionen verwenden Sie bitte die GitHub -Probleme.
Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.