Rmessage is a real-time message push API built on the reactor-netty project using Reactor3.
What is Reactor3?
Reactor is a completely non-blocking reactive programming framework for the JVM, with efficient demand management (i.e., control of "backpressure") capabilities. It integrates directly with Java 8 functional APIs such as CompletableFuture, Stream, and Duration. It provides the asynchronous sequence API Flux (for [N] elements) and Mono (for [0|1] elements), and fully follows and implements the "Reactive Extensions Specification".
What are the benefits of using Reactor?
It is very easy to build high-throughput pure asynchronous code, and it can seamlessly integrate spring5 [webflux] projects.
To use Rmessage, you need to externally manage group user relationships and offline message storage. Rmessage does not provide persistence. Tests can use the default Handler memory to retain offline messages. The entire project is developed using purely asynchronous programming ideas, aiming to learn reactive programming.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
Topic 【n bytes】
from length | to length | from | to |
---|---|---|---|
1byte | 1byte | n bytes | n bytes |
Body [n bytes]
body length | body |
---|---|
2 bytes | n bytes |
timstamp |
---|
8 bytes |
FixHeader【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
ConnectionState [n byte]
user length | password length | user | password |
---|---|---|---|
1byte | 1byte | n bytes | n bytes |
FixHeader【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
ConnectionState [n byte]
user length | user |
---|---|
1byte | n bytes |
FixHeader【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |