Rmessage es una API de envío de mensajes en tiempo real construida en el proyecto reactor-netty utilizando Reactor3.
¿Qué es Reactor3?
Reactor es un marco de programación reactiva completamente sin bloqueo para JVM, con capacidades eficientes de gestión de la demanda (es decir, control de "contrapresión"). Se integra directamente con las API funcionales de Java 8, como CompletableFuture, Stream y Duration. Proporciona la secuencia asincrónica API Flux (para [N] elementos) y Mono (para [0|1] elementos), y sigue e implementa completamente la "Especificación de extensiones reactivas".
¿Cuáles son los beneficios de utilizar Reactor?
Es muy fácil crear código asincrónico puro de alto rendimiento y puede integrar perfectamente proyectos spring5 [webflux].
Para usar Rmessage, debe administrar externamente las relaciones de usuarios del grupo y el almacenamiento de mensajes sin conexión. Rmessage no proporciona persistencia. Las pruebas pueden usar la memoria del controlador predeterminada para retener mensajes sin conexión. Todo el proyecto se desarrolla utilizando ideas de programación puramente asíncrona, con el objetivo de aprender programación reactiva.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 byte】
tipo_cliente | tipo_mensaje |
---|---|
alto 4 bits | bajo 4 bits |
Tema 【n bytes】
de longitud | a la longitud | de | a |
---|---|---|---|
1 byte | 1 byte | n bytes | n bytes |
Cuerpo [n bytes]
longitud del cuerpo | cuerpo |
---|---|
2 bytes | n bytes |
sello de tiempo |
---|
8 bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensaje |
---|---|
alto 4 bits | bajo 4 bits |
Estado de conexión [n bytes]
longitud del usuario | longitud de la contraseña | usuario | contraseña |
---|---|---|---|
1 byte | 1 byte | n bytes | n bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensaje |
---|---|
alto 4 bits | bajo 4 bits |
Estado de conexión [n bytes]
longitud del usuario | usuario |
---|---|
1 byte | n bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensaje |
---|---|
alto 4 bits | bajo 4 bits |