Rmessage ist eine Echtzeit-Nachrichten-Push-API, die auf dem Reactor-Netty-Projekt mit Reactor3 basiert.
Was ist Reactor3?
Reactor ist ein völlig blockierungsfreies reaktives Programmierframework für die JVM mit effizienten Nachfragemanagementfunktionen (d. h. Kontrolle des „Gegendrucks“). Es lässt sich direkt in Java 8-Funktions-APIs wie CompletableFuture, Stream und Duration integrieren. Es bietet die asynchrone Sequenz API Flux (für [N] Elemente) und Mono (für [0|1] Elemente) und folgt vollständig der „Reactive Extensions Specification“ und implementiert diese.
Welche Vorteile bietet die Verwendung von Reactor?
Es ist sehr einfach, reinen asynchronen Code mit hohem Durchsatz zu erstellen und Spring5-Projekte [Webflux] nahtlos zu integrieren.
Um Rmessage zu verwenden, müssen Sie Gruppenbenutzerbeziehungen extern verwalten und Rmessage bietet keine Persistenz. Tests können den Standard-Handler-Speicher verwenden, um Offline-Nachrichten aufzubewahren. Das gesamte Projekt wird unter Verwendung rein asynchroner Programmierideen entwickelt, mit dem Ziel, reaktives Programmieren zu erlernen.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 Byte】
client_type | Nachrichtentyp |
---|---|
hohe 4bit | niedrige 4bit |
Thema 【n Bytes】
von der Länge | auf Länge | aus | Zu |
---|---|---|---|
1 Byte | 1 Byte | n Bytes | n Bytes |
Körper [n Bytes]
Körperlänge | Körper |
---|---|
2 Bytes | n Bytes |
Zeitstempel |
---|
8 Byte |
FixHeader【1 Byte】
client_type | Nachrichtentyp |
---|---|
hohe 4bit | niedrige 4bit |
ConnectionState [n Byte]
Benutzerlänge | Passwortlänge | Benutzer | Passwort |
---|---|---|---|
1 Byte | 1 Byte | n Bytes | n Bytes |
FixHeader【1 Byte】
client_type | Nachrichtentyp |
---|---|
hohe 4bit | niedrige 4bit |
ConnectionState [n Byte]
Benutzerlänge | Benutzer |
---|---|
1 Byte | n Bytes |
FixHeader【1 Byte】
client_type | Nachrichtentyp |
---|---|
hohe 4bit | niedrige 4bit |