Rmessage é uma API push de mensagens em tempo real construída no projeto reactor-netty usando Reactor3.
O que é o Reactor3?
Reactor é uma estrutura de programação reativa totalmente sem bloqueio para JVM, com recursos eficientes de gerenciamento de demanda (ou seja, controle de "contrapressão"). Ele se integra diretamente com APIs funcionais Java 8, como CompletableFuture, Stream e Duration. Ele fornece a sequência assíncrona API Flux (para [N] elementos) e Mono (para [0|1] elementos) e segue e implementa totalmente a "Especificação de extensões reativas".
Quais são os benefícios de usar o Reactor?
É muito fácil construir código assíncrono puro de alto rendimento e pode integrar perfeitamente projetos spring5 [webflux].
Para usar o Rmessage, você precisa gerenciar externamente os relacionamentos de usuários do grupo e o armazenamento de mensagens off-line não fornece persistência. Os testes podem usar a memória do manipulador padrão para reter mensagens off-line. Todo o projeto é desenvolvido utilizando ideias de programação puramente assíncronas, visando aprender programação reativa.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 byte】
tipo_cliente | tipo_mensagem |
---|---|
alto 4 bits | baixo 4 bits |
Tópico 【n bytes】
do comprimento | comprimento | de | para |
---|---|---|---|
1 byte | 1 byte | n bytes | n bytes |
Corpo [n bytes]
comprimento do corpo | corpo |
---|---|
2 bytes | n bytes |
carimbo de data/hora |
---|
8 bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensagem |
---|---|
alto 4 bits | baixo 4 bits |
Estado de Conexão [n byte]
comprimento do usuário | comprimento da senha | usuário | senha |
---|---|---|---|
1 byte | 1 byte | n bytes | n bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensagem |
---|---|
alto 4 bits | baixo 4 bits |
Estado de Conexão [n byte]
comprimento do usuário | usuário |
---|---|
1 byte | n bytes |
FixHeader【1 byte】
tipo_cliente | tipo_mensagem |
---|---|
alto 4 bits | baixo 4 bits |