Rmessage est une API push de messages en temps réel construite sur le projet Reactor-Netty utilisant Reactor3.
Qu’est-ce que Reactor3 ?
Reactor est un cadre de programmation réactif totalement non bloquant pour la JVM, avec des capacités efficaces de gestion de la demande (c'est-à-dire le contrôle de la « contre-pression »). Il s'intègre directement aux API fonctionnelles Java 8 telles que CompletableFuture, Stream et Duration. Il fournit la séquence asynchrone API Flux (pour les éléments [N]) et Mono (pour les éléments [0|1]), et suit et implémente entièrement la « Spécification des extensions réactives ».
Quels sont les avantages d’utiliser Reactor ?
Il est très facile de créer du code asynchrone pur à haut débit et peut intégrer de manière transparente les projets spring5 [webflux].
Pour utiliser Rmessage, vous devez gérer en externe les relations entre les utilisateurs du groupe et le stockage des messages hors ligne. Les tests peuvent utiliser la mémoire du gestionnaire par défaut pour conserver les messages hors ligne. L'ensemble du projet est développé à partir d'idées de programmation purement asynchrone, visant à apprendre la programmation réactive.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 octet】
type_client | type_message |
---|---|
haut 4 bits | faible 4 bits |
Sujet 【n octets】
de la longueur | à la longueur | depuis | à |
---|---|---|---|
1 octet | 1 octet | n octets | n octets |
Corps [n octets]
longueur du corps | corps |
---|---|
2 octets | n octets |
horodatage |
---|
8 octets |
FixHeader【1 octet】
type_client | type_message |
---|---|
haut 4 bits | faible 4 bits |
État de connexion [n octet]
longueur de l'utilisateur | longueur du mot de passe | utilisateur | mot de passe |
---|---|---|---|
1 octet | 1 octet | n octets | n octets |
FixHeader【1 octet】
type_client | type_message |
---|---|
haut 4 bits | faible 4 bits |
État de connexion [n octet]
longueur de l'utilisateur | utilisateur |
---|---|
1 octet | n octets |
FixHeader【1 octet】
type_client | type_message |
---|---|
haut 4 bits | faible 4 bits |