Rmessage adalah API push pesan real-time yang dibangun pada proyek jaringan reaktor menggunakan Reactor3.
Apa itu Reactor3?
Reactor adalah kerangka pemrograman reaktif yang sepenuhnya non-pemblokiran untuk JVM, dengan kemampuan manajemen permintaan yang efisien (yaitu, kontrol "tekanan balik"). Ini terintegrasi langsung dengan API fungsional Java 8 seperti CompletableFuture, Stream, dan Duration. Ini menyediakan API Flux urutan asinkron (untuk elemen [N]) dan Mono (untuk elemen [0|1]), dan sepenuhnya mengikuti dan mengimplementasikan "Spesifikasi Ekstensi Reaktif".
Apa keuntungan menggunakan Reaktor?
Sangat mudah untuk membuat kode asinkron murni dengan throughput tinggi, dan dapat mengintegrasikan proyek spring5 [webflux] dengan mulus.
Untuk menggunakan Rmessage, Anda perlu mengelola hubungan pengguna grup secara eksternal dan penyimpanan pesan offline. Rmessage tidak menyediakan persistensi. Keseluruhan proyek dikembangkan menggunakan ide pemrograman asinkron murni, yang bertujuan untuk mempelajari pemrograman reaktif.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
PerbaikiHeader 【1 byte】
tipe_klien | tipe_pesan |
---|---|
4bit tinggi | 4bit rendah |
Topik 【n byte】
dari panjang | menjadi panjang | dari | ke |
---|---|---|---|
1bita | 1bita | n byte | n byte |
Isi [n byte]
panjang tubuh | tubuh |
---|---|
2 byte | n byte |
cap waktu |
---|
8 byte |
PerbaikiHeader 【1 byte】
tipe_klien | tipe_pesan |
---|---|
4bit tinggi | 4bit rendah |
Status Koneksi [n byte]
panjang pengguna | panjang kata sandi | pengguna | kata sandi |
---|---|---|---|
1bita | 1bita | n byte | n byte |
PerbaikiHeader 【1 byte】
tipe_klien | tipe_pesan |
---|---|
4bit tinggi | 4bit rendah |
Status Koneksi [n byte]
panjang pengguna | pengguna |
---|---|
1bita | n byte |
PerbaikiHeader 【1 byte】
tipe_klien | tipe_pesan |
---|---|
4bit tinggi | 4bit rendah |