RXJAVA เป็นการใช้งาน Java VM ของส่วนขยายปฏิกิริยา: ไลบรารีสำหรับการเขียนโปรแกรมแบบอะซิงโครนัสและเหตุการณ์โดยใช้ลำดับที่สังเกตได้
มันขยายรูปแบบผู้สังเกตการณ์เพื่อรองรับลำดับของข้อมูล/เหตุการณ์และเพิ่มตัวดำเนินการที่ช่วยให้คุณสามารถเขียนลำดับเข้าด้วยกันได้อย่างชัดเจน
เรียนรู้เพิ่มเติมเกี่ยวกับ RXJAVA โดยทั่วไปในบ้านวิกิ
โปรดอ่านสิ่งที่แตกต่างใน 3.0 สำหรับรายละเอียดเกี่ยวกับการเปลี่ยนแปลงและข้อมูลการย้ายถิ่นเมื่ออัพเกรดจาก 2.x
รุ่น 2.x เป็นช่วงสิ้นสุดชีวิต ณ วันที่ 28 กุมภาพันธ์ 2564 จะไม่มีการพัฒนาเพิ่มเติมการสนับสนุนการบำรุงรักษา PRS และการอัปเดตจะเกิดขึ้น Javadoc ของเวอร์ชันล่าสุด 2.2.21 จะยังคงเข้าถึงได้
เวอร์ชัน 1.x เป็นจุดสิ้นสุดของชีวิต ณ วันที่ 31 มีนาคม 2018 จะไม่มีการพัฒนาเพิ่มเติมการสนับสนุนการบำรุงรักษา PRS และการอัปเดตจะเกิดขึ้น Javadoc ของเวอร์ชันล่าสุด 1.3.8 จะยังคงเข้าถึงได้
ขั้นตอนแรกคือการรวม RXJAVA 3 ลงในโครงการของคุณตัวอย่างเช่นการพึ่งพาการรวบรวม Gradle:
implementation " io.reactivex.rxjava3:rxjava:3.x.y "
(โปรดแทนที่ x
และ y
ด้วยหมายเลขเวอร์ชันล่าสุด :)
ประการที่สองคือการเขียนโปรแกรม Hello World :
package rxjava . examples ;
import io . reactivex . rxjava3 . core .*;
public class HelloWorld {
public static void main ( String [] args ) {
Flowable . just ( "Hello world" ). subscribe ( System . out :: println );
}
}
โปรดทราบว่าตอนนี้ส่วนประกอบ RXJAVA 3 อยู่ภายใต้ io.reactivex.rxjava3
และคลาสพื้นฐานและอินเทอร์เฟซอาศัยอยู่ภายใต้ io.reactivex.rxjava3.core
RXJAVA 3 มีคลาสพื้นฐานหลายคลาสที่คุณสามารถค้นพบผู้ให้บริการได้ใน:
io.reactivex.rxjava3.core.Flowable
: 0..N กระแส, รองรับการเกิดปฏิกิริยาและแรงดันแบ็คio.reactivex.rxjava3.core.Observable
: 0..n กระแสไม่มี backpressureio.reactivex.rxjava3.core.Single
: การไหลของรายการ 1 รายการหรือข้อผิดพลาดio.reactivex.rxjava3.core.Completable
: โฟลว์ที่ไม่มีรายการ แต่มีเพียงสัญญาณที่สมบูรณ์หรือข้อผิดพลาดio.reactivex.rxjava3.core.Maybe
: โฟลว์ที่ไม่มีรายการแน่นอนว่าหนึ่งรายการหรือข้อผิดพลาดdataflows ใน RXJAVA ประกอบด้วยแหล่งที่มาเป็นศูนย์หรือมากกว่าขั้นตอนกลางตามด้วยขั้นตอนผู้บริโภคข้อมูลหรือ combinator (ซึ่งขั้นตอนมีหน้าที่รับผิดชอบในการบริโภค dataflow ด้วยวิธีการบางอย่าง):
source . operator1 (). operator2 (). operator3 (). subscribe ( consumer );
source . flatMap ( value -> source . operator1 (). operator2 (). operator3 ());
ที่นี่ถ้าเราจินตนาการถึงตัวเราเองใน operator2
การมองไปทางซ้ายไปทางแหล่งที่มาเรียกว่า ต้นน้ำ การมองไปทางขวาไปยังสมาชิก/ผู้บริโภคเรียกว่า ดาวน์สตรีม สิ่งนี้มักจะชัดเจนมากขึ้นเมื่อแต่ละองค์ประกอบถูกเขียนบนบรรทัดแยกต่างหาก:
source
. operator1 ()
. operator2 ()
. operator3 ()
. subscribe ( consumer )
ในเอกสารของ RXJAVA การปล่อย การปล่อยออกมา รายการ เหตุการณ์ สัญญาณ ข้อมูล และ ข้อความ ถือเป็นคำพ้องความหมายและแสดงวัตถุที่เดินทางไปตาม DataFlow
เมื่อ DataFlow ทำงานผ่านขั้นตอนแบบอะซิงโครนัสแต่ละขั้นตอนอาจดำเนินการสิ่งต่าง ๆ ด้วยความเร็วที่แตกต่างกัน เพื่อหลีกเลี่ยงขั้นตอนดังกล่าวอย่างล้นหลามซึ่งมักจะแสดงให้เห็นว่าการใช้หน่วยความจำที่เพิ่มขึ้นเนื่องจากการบัฟเฟอร์ชั่วคราวหรือความจำเป็นในการข้าม/การปล่อยข้อมูลที่เรียกว่า backpressure จะถูกนำไปใช้ พวกเขาพร้อมที่จะดำเนินการหรือไม่ สิ่งนี้ช่วยให้การ จำกัด การใช้หน่วยความจำของการไหลของข้อมูลในสถานการณ์ที่โดยทั่วไปไม่มีทางที่จะรู้ว่ามีกี่รายการที่ต้นน้ำจะส่งไป
ใน RXJAVA คลาส Flowable
โดยเฉพาะจะถูกกำหนดให้รองรับแรงดันแบ็คและ Observable
นั้นทุ่มเทให้กับการดำเนินการที่ไม่ได้รับการบดอัด (ลำดับสั้น ๆ การโต้ตอบ GUI ฯลฯ ) ประเภทอื่น ๆ Single
Maybe
ไม่รองรับแรงดันแบ็ค Completable
หรือไม่ควร มีที่ว่างสำหรับจัดเก็บหนึ่งรายการชั่วคราว
การเตรียมการไหลของข้อมูลโดยการใช้ผู้ให้บริการระดับกลางต่าง ๆ เกิดขึ้นใน เวลา ที่เรียกว่า:
Flowable < Integer > flow = Flowable . range ( 1 , 5 )
. map ( v -> v * v )
. filter ( v -> v % 3 == 0 )
;
ณ จุดนี้ข้อมูลยังไม่ไหลและไม่มีผลข้างเคียงเกิดขึ้น
นี่คือสถานะชั่วคราวเมื่อ subscribe()
เรียกว่าโฟลว์ที่กำหนดห่วงโซ่ของขั้นตอนการประมวลผลภายใน:
flow . subscribe ( System . out :: println )
นี่คือเมื่อมี การเรียกผลข้างเคียงของการสมัครสมาชิก (ดู doOnSubscribe
) บางแหล่งบล็อกหรือเริ่มปล่อยรายการทันทีในสถานะนี้
นี่คือสถานะเมื่อกระแสไหลออกมาอย่างแข็งขันการปล่อยรายการข้อผิดพลาดหรือสัญญาณเสร็จสมบูรณ์:
Observable . create ( emitter -> {
while (! emitter . isDisposed ()) {
long time = System . currentTimeMillis ();
emitter . onNext ( time );
if ( time % 2 != 0 ) {
emitter . onError ( new IllegalStateException ( "Odd millisecond!" ));
break ;
}
}
})
. subscribe ( System . out :: println , Throwable :: printStackTrace );
ในทางปฏิบัตินี่คือเมื่อร่างกายของตัวอย่างที่กำหนดข้างต้นดำเนินการ
หนึ่งในกรณีการใช้งานทั่วไปสำหรับ RXJAVA คือเรียกใช้การคำนวณบางอย่างคำขอเครือข่ายบนเธรดพื้นหลังและแสดงผลลัพธ์ (หรือข้อผิดพลาด) บนเธรด UI:
import io . reactivex . rxjava3 . schedulers . Schedulers ;
Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
})
. subscribeOn ( Schedulers . io ())
. observeOn ( Schedulers . single ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 ); // <--- wait for the flow to finish
รูปแบบของวิธีการผูกมัดนี้เรียกว่า API ที่คล่องแคล่ว ซึ่งคล้ายกับ รูปแบบของผู้สร้าง อย่างไรก็ตามประเภทปฏิกิริยาของ Rxjava นั้นไม่เปลี่ยนรูป การโทรแต่ละวิธีจะส่งคืน Flowable
ใหม่พร้อมพฤติกรรมที่เพิ่มเข้ามา เพื่อแสดงให้เห็นตัวอย่างสามารถเขียนใหม่ได้ดังนี้:
Flowable < String > source = Flowable . fromCallable (() -> {
Thread . sleep ( 1000 ); // imitate expensive computation
return "Done" ;
});
Flowable < String > runBackground = source . subscribeOn ( Schedulers . io ());
Flowable < String > showForeground = runBackground . observeOn ( Schedulers . single ());
showForeground . subscribe ( System . out :: println , Throwable :: printStackTrace );
Thread . sleep ( 2000 );
โดยทั่วไปคุณสามารถย้ายการคำนวณหรือปิดกั้น IO ไปยังเธรดอื่น ๆ ผ่าน subscribeOn
เมื่อข้อมูลพร้อมแล้วคุณสามารถตรวจสอบให้แน่ใจว่าพวกเขาได้รับการประมวลผลบนด้ายเบื้องหน้าหรือด้าย GUI ผ่าน observeOn
ผู้ให้บริการ RXJAVA ไม่ทำงานกับ Thread
S หรือ ExecutorService
S โดยตรง แต่มี Scheduler
ที่เรียกว่าเป็นนามธรรมของแหล่งที่มาของการเกิดขึ้นพร้อมกันหลัง API เครื่องแบบ RXJAVA 3 มีตัวกำหนดตารางเวลามาตรฐานหลายตัวที่สามารถเข้าถึงได้ผ่านคลาสยูทิลิตี้ Schedulers
Schedulers.computation()
: เรียกใช้งานการคำนวณอย่างเข้มข้นในจำนวนเธรดที่ทุ่มเทจำนวนคงที่ในพื้นหลัง ตัวดำเนินการแบบอะซิงโครนัสส่วนใหญ่ใช้สิ่งนี้เป็น Scheduler
เริ่มต้นSchedulers.io()
: เรียกใช้การดำเนินการ I/O หรือการปิดกั้นในชุดเธรดที่เปลี่ยนแปลงแบบไดนามิกSchedulers.single()
: รันงานบนเธรดเดียวในลักษณะลำดับและ FIFOSchedulers.trampoline()
: ทำงานทำงานในลักษณะลำดับและ FIFO ในหนึ่งในเธรดที่เข้าร่วมโดยปกติเพื่อการทดสอบ สิ่งเหล่านี้มีอยู่ในแพลตฟอร์ม JVM ทั้งหมด แต่บางแพลตฟอร์มบางอย่างเช่น Android มีการกำหนด Scheduler
ทั่วไปของตัวเอง: AndroidSchedulers.mainThread()
, SwingScheduler.instance()
หรือ JavaFXScheduler.platform()
นอกจากนี้ยังมีตัวเลือกในการห่อ Executor
ที่มีอยู่ (และชนิดย่อยเช่น ExecutorService
) ลงใน Scheduler
ผ่าน Schedulers.from(Executor)
ตัวอย่างเช่นสิ่งนี้สามารถใช้งานได้เพื่อให้มีสระว่ายน้ำขนาดใหญ่ แต่ยังคงคงที่ของเธรด (ไม่เหมือน computation()
และ io()
ตามลำดับ)
Thread.sleep(2000);
ในตอนท้ายไม่มีอุบัติเหตุ ใน RXJAVA Scheduler
เริ่มต้น S จะทำงานบนเธรด daemon ซึ่งหมายความว่าเมื่อออกจากเธรดหลักของ Java พวกเขาทั้งหมดจะหยุดและการคำนวณพื้นหลังอาจไม่เกิดขึ้น นอนหลับอยู่พักหนึ่งในสถานการณ์ตัวอย่างนี้ช่วยให้คุณเห็นเอาต์พุตของการไหลของคอนโซลพร้อมเวลาว่าง
การไหลใน rxjava เป็นลำดับในธรรมชาติแบ่งออกเป็นขั้นตอนการประมวลผลที่อาจทำงาน ร่วมกันพร้อมกัน :
Flowable . range ( 1 , 10 )
. observeOn ( Schedulers . computation ())
. map ( v -> v * v )
. blockingSubscribe ( System . out :: println );
ตัวอย่างการไหลนี้สี่เหลี่ยมตัวเลขจาก 1 ถึง 10 ใน Scheduler
การคำนวณ และใช้ผลลัพธ์ในเธรด "หลัก" (แม่นยำยิ่งขึ้นเธรดผู้โทรของ blockingSubscribe
) อย่างไรก็ตาม Lambda v -> v * v
ไม่ทำงานแบบขนานสำหรับการไหลนี้ มันได้รับค่า 1 ถึง 10 ในเธรดการคำนวณเดียวกันหนึ่งหลังจากนั้น
การประมวลผลหมายเลข 1 ถึง 10 ในแบบคู่ขนานนั้นมีส่วนเกี่ยวข้องมากขึ้น:
Flowable . range ( 1 , 10 )
. flatMap ( v ->
Flowable . just ( v )
. subscribeOn ( Schedulers . computation ())
. map ( w -> w * w )
)
. blockingSubscribe ( System . out :: println );
ในทางปฏิบัติความเท่าเทียมใน Rxjava หมายถึงการไหลของกระแสอิสระและรวมผลลัพธ์กลับคืนสู่การไหลเพียงครั้งเดียว ผู้ประกอบการ flatMap
ทำสิ่งนี้โดยการแมปแรกแต่ละหมายเลขตั้งแต่ 1 ถึง 10 เป็นตัวของตัวเอง Flowable
ของตัวเองวิ่งและรวมสี่เหลี่ยมที่คำนวณได้
อย่างไรก็ตามโปรดทราบว่า flatMap
ไม่รับประกันคำสั่งซื้อใด ๆ และรายการจากกระแสด้านในอาจจบลงด้วย interleaved มีผู้ให้บริการทางเลือก:
concatMap
ที่แผนที่และเรียกใช้หนึ่งการไหลภายในในแต่ละครั้งและconcatMapEager
ซึ่งเรียกใช้การไหลภายในทั้งหมด "ในครั้งเดียว" แต่กระแสเอาต์พุตจะอยู่ในลำดับที่ไหลเข้าด้านในนั้นถูกสร้างขึ้น อีกทางเลือกหนึ่งตัวดำเนินการ Flowable.parallel()
และประเภท ParallelFlowable
ช่วยให้ได้รูปแบบการประมวลผลแบบขนานเดียวกัน:
Flowable . range ( 1 , 10 )
. parallel ()
. runOn ( Schedulers . computation ())
. map ( v -> v * v )
. sequential ()
. blockingSubscribe ( System . out :: println );
flatMap
เป็นผู้ให้บริการที่ทรงพลังและช่วยในสถานการณ์ต่างๆ ตัวอย่างเช่นเมื่อได้รับบริการที่ส่งคืน Flowable
เราต้องการเรียกใช้บริการอื่นที่มีค่าที่ปล่อยออกมาโดยบริการแรก:
Flowable < Inventory > inventorySource = warehouse . getInventoryAsync ();
inventorySource
. flatMap ( inventoryItem -> erp . getDemandAsync ( inventoryItem . getId ())
. map ( demand -> "Item " + inventoryItem . getName () + " has demand " + demand ))
. subscribe ( System . out :: println );
บางครั้งเมื่อมีรายการที่มีอยู่เราก็อยากจะทำการคำนวณบางอย่างขึ้นอยู่กับมัน บางครั้งเรียกว่าความ ต่อเนื่อง และขึ้นอยู่กับสิ่งที่ควรเกิดขึ้นและประเภทที่เกี่ยวข้องอาจเกี่ยวข้องกับผู้ให้บริการหลายรายเพื่อให้สำเร็จ
สถานการณ์ทั่วไปที่สุดคือการให้คุณค่าเรียกใช้บริการอื่นรอและดำเนินการต่อด้วยผลลัพธ์:
service . apiCall ()
. flatMap ( value -> service . anotherApiCall ( value ))
. flatMap ( next -> service . finalCall ( next ))
มักจะเป็นกรณีที่ลำดับในภายหลังจะต้องใช้ค่าจากการแมปก่อนหน้า สิ่งนี้สามารถทำได้โดยการย้าย flatMap
ด้านนอกไปยังส่วนด้านในของ flatMap
ก่อนหน้าเช่น:
service . apiCall ()
. flatMap ( value ->
service . anotherApiCall ( value )
. flatMap ( next -> service . finalCallBoth ( value , next ))
)
ที่นี่ value
ดั้งเดิมจะมีอยู่ใน flatMap
ด้านในซึ่งได้รับความอนุเคราะห์จากการจับตัวแปรแลมบ์ดา
ในสถานการณ์อื่น ๆ ผลลัพธ์ของแหล่งที่มา/dataflow แรกนั้นไม่เกี่ยวข้องและหนึ่งต้องการที่จะดำเนินการต่อกับเสมือนเป็นอิสระอีกแหล่งหนึ่ง ที่นี่ flatMap
ใช้งานได้เช่นกัน:
Observable continued = sourceObservable . flatMapSingle ( ignored -> someSingleSource )
continued . map ( v -> v . toString ())
. subscribe ( System . out :: println , Throwable :: printStackTrace );
อย่างไรก็ตามความต่อเนื่องในกรณีนี้ยังคง Observable
แทนที่จะเป็น Single
ที่เหมาะสมกว่า (สิ่งนี้สามารถเข้าใจได้เนื่องจากจากมุมมองของ flatMapSingle
, sourceObservable
เป็นแหล่งที่มาหลายค่าและทำให้การแมปอาจส่งผลให้มีค่าหลายค่าเช่นกัน)
บ่อยครั้งแม้ว่าจะมีวิธีที่ค่อนข้างแสดงออก (และค่าใช้จ่ายที่ต่ำกว่า) โดยใช้ Completable
ในฐานะผู้ไกล่เกลี่ยและผู้ประกอบการ andThen
ก็กลับมามีอย่างอื่น:
sourceObservable
. ignoreElements () // returns Completable
. andThen ( someSingleSource )
. map ( v -> v . toString ())
การพึ่งพาเพียงอย่างเดียวระหว่าง sourceObservable
และ someSingleSource
คืออดีตควรเสร็จสิ้นตามปกติเพื่อให้การบริโภคหลัง
บางครั้งมีการพึ่งพาข้อมูลโดยนัยระหว่างลำดับก่อนหน้าและลำดับใหม่ที่ด้วยเหตุผลบางอย่างไม่ไหลผ่าน "ช่องทางปกติ" มีแนวโน้มที่จะเขียนความต่อเนื่องดังกล่าวดังนี้:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . just ( count . get ()))
. subscribe ( System . out :: println );
น่าเสียดายที่สิ่งนี้พิมพ์ 0
เพราะ Single.just(count.get())
ได้รับการประเมินใน เวลาประกอบ เมื่อ DataFlow ยังไม่ทำงาน เราต้องการบางสิ่งบางอย่างที่ป้องกันการประเมินแหล่ง Single
นี้จนถึง รันไทม์ เมื่อแหล่งข้อมูลหลักเสร็จสมบูรณ์:
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . defer (() -> Single . just ( count . get ())))
. subscribe ( System . out :: println );
หรือ
AtomicInteger count = new AtomicInteger ();
Observable . range ( 1 , 10 )
. doOnNext ( ignored -> count . incrementAndGet ())
. ignoreElements ()
. andThen ( Single . fromCallable (() -> count . get ()))
. subscribe ( System . out :: println );
บางครั้งแหล่งที่มาหรือบริการส่งคืนประเภทที่แตกต่างจากโฟลว์ที่ควรจะทำงานกับมัน ตัวอย่างเช่นในตัวอย่างสินค้าคงคลังข้างต้น getDemandAsync
สามารถส่งคืน Single<DemandRecord>
หากตัวอย่างรหัสไม่เปลี่ยนแปลงสิ่งนี้จะส่งผลให้เกิดข้อผิดพลาดในการรวบรวมเวลา (อย่างไรก็ตามบ่อยครั้งที่มีข้อความแสดงข้อผิดพลาดที่ทำให้เข้าใจผิดเกี่ยวกับการขาดการโอเวอร์โหลด)
ในสถานการณ์เช่นนี้มักจะมีสองตัวเลือกในการแก้ไขการแปลง: 1) แปลงเป็นประเภทที่ต้องการหรือ 2) ค้นหาและใช้ overload ของผู้ปฏิบัติงานเฉพาะที่รองรับประเภทที่แตกต่างกัน
ตัวดำเนินการคลาสพื้นฐานแบบรีแอกทีฟแต่ละตัวดำเนินการที่สามารถทำการแปลงดังกล่าวรวมถึงการแปลงโปรโตคอลเพื่อให้ตรงกับประเภทอื่น ๆ เมทริกซ์ต่อไปนี้แสดงตัวเลือกการแปลงที่มีอยู่:
ไหลได้ | สังเกตได้ | เดี่ยว | อาจจะ | ซึ่งเสร็จได้ | |
---|---|---|---|---|---|
ไหลได้ | toObservable | first firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
สังเกตได้ | toFlowable 2 | first firstOrError , single , singleOrError , last , lastOrError 1 | firstElement , singleElement , lastElement | ignoreElements | |
เดี่ยว | toFlowable 3 | toObservable | toMaybe | ignoreElement | |
อาจจะ | toFlowable 3 | toObservable | toSingle | ignoreElement | |
ซึ่งเสร็จได้ | toFlowable | toObservable | toSingle | toMaybe |
1 : เมื่อเปลี่ยนแหล่งที่มาหลายค่าเป็นแหล่งที่มีค่าเดียวเราควรตัดสินใจว่าค่าแหล่งที่มาของแหล่งที่มาใดควรได้รับการพิจารณาว่าเป็นผลลัพธ์
2 : การเปลี่ยนสิ่ง Observable
กลายเป็น Flowable
ต้องมีการตัดสินใจเพิ่มเติม: จะทำอย่างไรกับการไหลของแหล่งกำเนิดที่ไม่ จำกัด Observable
? มีกลยุทธ์หลายอย่าง (เช่นบัฟเฟอร์ลดลงการรักษาล่าสุด) ผ่านพารามิเตอร์ BackpressureStrategy
หรือผ่านตัวดำเนินการ Flowable
มาตรฐานเช่น onBackpressureBuffer
, onBackpressureDrop
, onBackpressureLatest
3 : เมื่อมีเพียงรายการเดียว (มากที่สุด) หนึ่งรายการไม่มีปัญหากับแรงดันแบ็คแรงดันเนื่องจากสามารถเก็บไว้ได้เสมอจนกว่าจะถึงปลายน้ำพร้อมที่จะบริโภค
ผู้ให้บริการที่ใช้บ่อยจำนวนมากมีโอเวอร์โหลดที่สามารถจัดการกับประเภทอื่น ๆ ได้ เหล่านี้มักจะตั้งชื่อด้วยคำต่อท้ายของประเภทเป้าหมาย:
ผู้ดำเนินการ | โอเวอร์โหลด |
---|---|
flatMap | flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap | concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap | switchMapSingle , switchMapMaybe , switchMapCompletable |
เหตุผลที่ผู้ประกอบการเหล่านี้มีคำต่อท้ายแทนที่จะมีชื่อเดียวกันกับลายเซ็นที่แตกต่างกันคือการลบประเภท Java ไม่ได้พิจารณาลายเซ็นเช่น operator(Function<T, Single<R>>)
และ operator(Function<T, Maybe<R>>)
แตกต่างกัน (ต่างจาก C#) และเนื่องจากการลบ operator
ทั้งสองจะจบลง เป็นวิธีที่ซ้ำกันที่มีลายเซ็นเดียวกัน
การตั้งชื่อในการเขียนโปรแกรมเป็นหนึ่งในสิ่งที่ยากที่สุดเนื่องจากชื่อคาดว่าจะไม่นานแสดงออกจับภาพและน่าจดจำได้ง่าย น่าเสียดายที่ภาษาเป้าหมาย (และอนุสัญญาที่มีอยู่แล้ว) อาจไม่ได้รับความช่วยเหลือมากเกินไปในเรื่องนี้ (คำหลักที่ใช้ไม่ได้, การลบประเภท, ความคลุมเครือพิมพ์ ฯลฯ )
ใน RX.NET ดั้งเดิมตัวดำเนินการที่ปล่อยรายการเดียวแล้วเสร็จสิ้นจะเรียกว่า Return(T)
เนื่องจากอนุสัญญา Java คือการมีตัวอักษรตัวพิมพ์เล็กเริ่มชื่อวิธีการนี้จะได้รับ return(T)
ซึ่งเป็นคำหลักใน Java และไม่สามารถใช้ได้ ดังนั้น Rxjava จึงเลือกชื่อผู้ให้บริการนี้ just(T)
ข้อ จำกัด เดียวกันนี้มีอยู่สำหรับ Switch
ของผู้ปฏิบัติงานซึ่งจะต้องมีชื่อว่า switchOnNext
อีกตัวอย่างหนึ่งคือ Catch
ซึ่งมีชื่อว่า onErrorResumeNext
ผู้ให้บริการหลายคนที่คาดหวังว่าผู้ใช้จะให้ฟังก์ชั่นบางอย่างที่ส่งคืนประเภทปฏิกิริยาไม่สามารถโหลดมากเกินไปได้เนื่องจากการลบประเภทรอบ Function<T, X>
เปลี่ยนวิธีการดังกล่าวให้กลายเป็นซ้ำ RXJAVA เลือกที่จะตั้งชื่อผู้ให้บริการดังกล่าวโดยเพิ่มประเภทเป็นคำต่อท้ายเช่นกัน:
Flowable < R > flatMap ( Function <? super T , ? extends Publisher <? extends R >> mapper )
Flowable < R > flatMapMaybe ( Function <? super T , ? extends MaybeSource <? extends R >> mapper )
แม้ว่าผู้ให้บริการบางรายจะไม่มีปัญหาจากการลบประเภท แต่ลายเซ็นของพวกเขาอาจกลายเป็นความคลุมเครือโดยเฉพาะอย่างยิ่งหากมีการใช้ Java 8 และ Lambdas ตัวอย่างเช่นมีการ overloads ของ concatWith
ที่ใช้ประเภทฐานปฏิกิริยาอื่น ๆ อีกประเภทหนึ่งเป็นข้อโต้แย้ง (เพื่อให้ความสะดวกและผลประโยชน์ด้านประสิทธิภาพในการดำเนินการพื้นฐาน):
Flowable < T > concatWith ( Publisher <? extends T > other );
Flowable < T > concatWith ( SingleSource <? extends T > other );
ทั้ง Publisher
และ SingleSource
ปรากฏเป็นอินเทอร์เฟซที่ใช้งานได้ (ประเภทที่มีวิธีการนามธรรมหนึ่งวิธี) และอาจกระตุ้นให้ผู้ใช้พยายามที่จะให้นิพจน์แลมบ์ดา:
someSource . concatWith ( s -> Single . just ( 2 ))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
น่าเสียดายที่วิธีการนี้ใช้งานไม่ได้และตัวอย่างไม่ได้พิมพ์ 2
เลย ในความเป็นจริงเนื่องจากเวอร์ชัน 2.1.10 มันไม่ได้รวบรวมเนื่องจากมีอย่างน้อย 4 concatWith
overloads มีอยู่และคอมไพเลอร์พบรหัสด้านบนคลุมเครือ
ผู้ใช้ในสถานการณ์ดังกล่าวอาจต้องการเลื่อนการคำนวณบางอย่างจนกว่า someSource
จะเสร็จสมบูรณ์ดังนั้นผู้ดำเนินการที่ไม่คลุมเครือควรถูก defer
:
someSource . concatWith ( Single . defer (() -> Single . just ( 2 )))
. subscribe ( System . out :: println , Throwable :: printStackTrace );
บางครั้งมีการเพิ่มคำต่อท้ายเพื่อหลีกเลี่ยงความคลุมเครือเชิงตรรกะที่อาจรวบรวม แต่สร้างประเภทที่ไม่ถูกต้องในการไหล:
Flowable < T > merge ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > mergeArray ( Publisher <? extends T >... sources );
สิ่งนี้จะได้รับความคลุมเครือเมื่อประเภทอินเทอร์เฟซที่ใช้งานได้มีส่วนร่วมเป็นอาร์กิวเมนต์ประเภท T
dataflows อาจล้มเหลว ณ จุดนั้นข้อผิดพลาดจะถูกปล่อยออกมาให้กับผู้บริโภค บางครั้งแม้ว่าหลายแหล่งอาจล้มเหลว ณ จุดนั้นมีทางเลือกไม่ว่าจะรอให้พวกเขาทั้งหมดเสร็จสมบูรณ์หรือล้มเหลว เพื่อระบุโอกาสนี้ชื่อผู้ประกอบการจำนวนมากจะถูกต่อท้ายด้วยคำ DelayError
(ในขณะที่คนอื่น ๆ มี delayError
หรือ delayErrors
บูลีนในหนึ่งในโอเวอร์โหลดของพวกเขา):
Flowable < T > concat ( Publisher <? extends Publisher <? extends T >> sources );
Flowable < T > concatDelayError ( Publisher <? extends Publisher <? extends T >> sources );
แน่นอนคำต่อท้ายประเภทต่าง ๆ อาจปรากฏขึ้นพร้อมกัน:
Flowable < T > concatArrayEagerDelayError ( Publisher <? extends T >... sources );
คลาสพื้นฐานสามารถพิจารณาได้อย่างหนักเนื่องจากจำนวนวิธีการคงที่และอินสแตนซ์ที่แท้จริง การออกแบบของ RXJAVA 3 ได้รับอิทธิพลอย่างมากจากข้อกำหนดของสตรีมปฏิกิริยาดังนั้นไลบรารีจึงมีคลาสและอินเทอร์เฟซต่อแต่ละประเภทปฏิกิริยา:
พิมพ์ | ระดับ | ส่วนต่อประสาน | ผู้บริโภค |
---|---|---|---|
0..N backpressured | Flowable | Publisher 1 | Subscriber |
0..N ไม่มีขอบเขต | Observable | ObservableSource 2 | Observer |
1 องค์ประกอบหรือข้อผิดพลาด | Single | SingleSource | SingleObserver |
0..1 องค์ประกอบหรือข้อผิดพลาด | Maybe | MaybeSource | MaybeObserver |
0 องค์ประกอบหรือข้อผิดพลาด | Completable | CompletableSource | CompletableObserver |
1 org.reactivestreams.Publisher
เป็นส่วนหนึ่งของไลบรารีสตรีมปฏิกิริยาภายนอก มันเป็นประเภทหลักในการโต้ตอบกับไลบรารีปฏิกิริยาอื่น ๆ ผ่านกลไกมาตรฐานที่ควบคุมโดยข้อกำหนดของสตรีมปฏิกิริยา
2 อนุสัญญาการตั้งชื่อของอินเทอร์เฟซคือการต่อท้าย Source
ของชื่อคลาสกึ่งดั้งเดิม ไม่มี FlowableSource
เนื่องจาก Publisher
จัดทำโดย Library reactive Streams (และประเภทย่อยมันจะไม่ได้ช่วยในการทำงานร่วมกันเช่นกัน) อย่างไรก็ตามอินเทอร์เฟซเหล่านี้ไม่ได้เป็นมาตรฐานในความหมายของข้อกำหนดของสตรีมปฏิกิริยาและปัจจุบันเป็นเฉพาะ Rxjava เท่านั้น
โดยค่าเริ่มต้น RXJAVA เองไม่จำเป็นต้องมีการตั้งค่า proguard/R8 และควรทำงานโดยไม่มีปัญหา น่าเสียดายที่การพึ่งพาสตรีมปฏิกิริยาตั้งแต่เวอร์ชัน 1.0.3 ได้ฝังไฟล์คลาส Java 9 ในขวดที่สามารถทำให้เกิดคำเตือนด้วย proguard ธรรมดา:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
ขอแนะนำให้ตั้งค่ารายการ -dontwarn
ต่อไปนี้ในไฟล์ proguard-ruleset
ของแอปพลิเคชัน:
-dontwarn java.util.concurrent.Flow*
สำหรับ R8 ขวด Rxjava รวมถึง META-INF/proguard/rxjava3.pro
ที่มีประโยคที่ไม่มีการเตือนแบบเดียวกันและควรใช้โดยอัตโนมัติ
สำหรับรายละเอียดเพิ่มเติมปรึกษาวิกิ
เวอร์ชัน 3.x อยู่ในระหว่างการพัฒนา BugFixes จะถูกนำไปใช้กับทั้ง 2.x และ 3.x สาขา แต่คุณสมบัติใหม่จะถูกเพิ่มลงใน 3.x เท่านั้น
การเพิ่มขึ้นเล็กน้อย 3.x (เช่น 3.1, 3.2, ฯลฯ ) จะเกิดขึ้นเมื่อมีการเพิ่มฟังก์ชั่นใหม่ที่ไม่สำคัญหรือการปรับปรุงที่สำคัญหรือการแก้ไขข้อผิดพลาดเกิดขึ้นซึ่งอาจมีการเปลี่ยนแปลงพฤติกรรมที่อาจส่งผลกระทบต่อบางกรณี (เช่นการพึ่งพาพฤติกรรมที่เกิดจาก ข้อผิดพลาด) ตัวอย่างของการปรับปรุงที่จะจำแนกเนื่องจากนี่คือการเพิ่มการรองรับแรงดันแบ็คเจอร์แบบดึงปฏิกิริยาให้กับผู้ปฏิบัติงานที่ก่อนหน้านี้ไม่รองรับ สิ่งนี้ควรย้อนหลังเข้ากันได้ แต่มีพฤติกรรมที่แตกต่างกัน
Patch 3.xy เพิ่มขึ้น (เช่น 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2, ฯลฯ ) จะเกิดขึ้นสำหรับการแก้ไขข้อบกพร่องและฟังก์ชันการทำงานเล็กน้อย (เช่นการเพิ่มวิธีการโอเวอร์โหลด) ฟังก์ชั่นใหม่ที่ทำเครื่องหมายด้วยคำอธิบายประกอบ @Beta
หรือ @Experimental
สามารถเพิ่มในการเผยแพร่แพตช์เพื่อให้การสำรวจอย่างรวดเร็วและการทำซ้ำของฟังก์ชั่นใหม่ที่ไม่แน่นอน
APIs ที่ทำเครื่องหมายด้วยคำอธิบายประกอบ @Beta
ที่ระดับชั้นเรียนหรือวิธีการอาจมีการเปลี่ยนแปลง พวกเขาสามารถแก้ไขได้ในทางใดทางหนึ่งหรือแม้กระทั่งลบได้ตลอดเวลา หากรหัสของคุณเป็นไลบรารีของตัวเอง (เช่นใช้กับ ClassPath ของผู้ใช้ที่อยู่นอกการควบคุมของคุณ) คุณไม่ควรใช้ Beta APIs เว้นแต่คุณจะบรรจุใหม่ (เช่นการใช้ proguard, การแรเงา ฯลฯ )
APIs ที่ทำเครื่องหมายด้วยคำอธิบายประกอบ @Experimental
ที่ระดับชั้นเรียนหรือวิธีการจะเปลี่ยนไปอย่างแน่นอน พวกเขาสามารถแก้ไขได้ในทางใดทางหนึ่งหรือแม้กระทั่งลบได้ตลอดเวลา คุณไม่ควรใช้หรือพึ่งพาพวกเขาในรหัสการผลิตใด ๆ พวกเขาล้วนๆเพื่อให้การทดสอบและข้อเสนอแนะในวงกว้าง
APIs ที่ทำเครื่องหมายด้วยคำอธิบายประกอบ @Deprecated
ที่ระดับชั้นเรียนหรือวิธีการจะยังคงได้รับการสนับสนุนจนกว่าจะมีการเปิดตัวครั้งใหญ่ครั้งต่อไป แต่ขอแนะนำให้หยุดใช้
รหัสทั้งหมดภายใน io.reactivex.rxjava3.internal.*
แพ็คเกจถือว่าเป็น API ส่วนตัวและไม่ควรพึ่งพาเลย มันสามารถเปลี่ยนแปลงได้ตลอดเวลา
http://reactivex.io/RxJava/3.x/javadoc/3.xy/
ไบนารีและข้อมูลการพึ่งพาสำหรับ Maven, Ivy, Gradle และอื่น ๆ สามารถดูได้ที่ http://search.maven.org
ตัวอย่างสำหรับ Gradle:
implementation ' io.reactivex.rxjava3:rxjava:x.y.z '
และสำหรับ Maven:
< dependency >
< groupId >io.reactivex.rxjava3</ groupId >
< artifactId >rxjava</ artifactId >
< version >x.y.z</ version >
</ dependency >
และสำหรับไม้เลื้อย:
< dependency org = " io.reactivex.rxjava3 " name = " rxjava " rev = " x.y.z " />
สแน็ปช็อตหลังวันที่ 1 พฤษภาคม 2021 มีให้บริการผ่าน https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
repositories {
maven { url ' https://oss.sonatype.org/content/repositories/snapshots ' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT '
}
Javadoc Snapshots มีอยู่ที่ http://reactivex.io/rxjava/3.x/javadoc/snapshot
เพื่อสร้าง:
$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
รายละเอียดเพิ่มเติมเกี่ยวกับการสร้างสามารถพบได้ในหน้าเริ่มต้นของวิกิ
สำหรับข้อบกพร่องคำถามและการอภิปรายโปรดใช้ปัญหา GitHub
Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.