Rmessage — это API-интерфейс отправки сообщений в режиме реального времени, созданный на основе проекта реактора-нетти с использованием Reactor3.
Что такое Реактор3?
Reactor — это полностью неблокирующая среда реактивного программирования для JVM с возможностями эффективного управления требованиями (т. е. контроля «противодавления»). Он напрямую интегрируется с функциональными API Java 8, такими как CompletableFuture, Stream и Duration. Он предоставляет API асинхронной последовательности Flux (для элементов [N]) и Mono (для элементов [0 | 1]), а также полностью соответствует и реализует «Спецификацию реактивных расширений».
Каковы преимущества использования Reactor?
Очень легко создавать чистый асинхронный код с высокой пропускной способностью, и он может легко интегрировать проекты Spring5 [webflux].
Чтобы использовать Rmessage, вам необходимо внешне управлять отношениями между группами пользователей, а автономное хранилище сообщений не обеспечивает постоянство. Тесты могут использовать память обработчика по умолчанию для хранения автономных сообщений. Весь проект разработан с использованием чисто идей асинхронного программирования с целью изучения реактивного программирования.
ServerStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. onReadIdle ( 10000l ) //设置读心跳时间
. onWriteIdle ( 10000l ) //设置写心跳时间
. option ( ChannelOption . SO_RCVBUF , 1023 )
. interceptor ( frame -> frame , frame -> frame ) // 拦截所有message
. setAfterChannelInit ( channel -> { // channel设置
})
. connect ()
. cast ( TcpServerSession . class )
. subscribe ( session ->{
session . addGroupHandler ( groupId -> null ). subscribe ();
session . addOfflineHandler ( new DefaultOffMessageHandler ()). subscribe ();
session . addUserHandler ( new DefaultUserTransportHandler ());
});
ClientStart
. builder ()
. tcp ()
. ip ( "127.0.0.1" )
. port ( 1888 )
. userId ( "21344" ) //设置用户名
. password ( "12312" ) //设置密码
. onReadIdle ( 10000l ,()->()-> System . out . println ( "心跳了" )) //设置读心跳,以及设置回调runner
. setClientType ( ClientType . Ios ) //设置客户端类型
. setAfterChannelInit ( channel -> {
// channel设置
})
. connect ()
. cast ( TcpClientSession . class )
. subscribe ( session ->{
session . sendPoint ( "123" , "测试一下哦" ). subscribe (); //发送单聊消息
session . sendGroup ( "group1" , "123" ). subscribe (); // 发送群聊消息
session . accept ( message -> {
}); // 接受所有消息
});
FixHeader【1 байт】
тип_клиента | тип_сообщения |
---|---|
высокий 4бит | низкий 4бит |
Тема 【n байт】
от длины | по длине | от | к |
---|---|---|---|
1 байт | 1 байт | n байт | n байт |
Тело [n байт]
длина тела | тело |
---|---|
2 байта | n байт |
временная метка |
---|
8 байт |
FixHeader【1 байт】
тип_клиента | тип_сообщения |
---|---|
высокий 4бит | низкий 4бит |
ConnectionState [n байт]
длина пользователя | длина пароля | пользователь | пароль |
---|---|---|---|
1 байт | 1 байт | n байт | n байт |
FixHeader【1 байт】
тип_клиента | тип_сообщения |
---|---|
высокий 4бит | низкий 4бит |
ConnectionState [n байт]
длина пользователя | пользователь |
---|---|
1 байт | n байт |
FixHeader【1 байт】
тип_клиента | тип_сообщения |
---|---|
высокий 4бит | низкий 4бит |