Rmessage
1.0.0
Rmessage是采用Reactor3,基于reactor-netty项目构建的实时消息推送api。
什么是Reactor3?
Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
使用Reactor好处?
非常容易构建高吞吐量纯异步的代码,还有就是能够无缝整合spring5[webflux]项目。
使用Rmessage你需要外部管理群组用户关系,以及离线消息存储,Rmessage不提供持久化,测试可以使用默认Handler内存保留离线消息。 整个项目采用纯异步的编程思想去开发,旨在学习reactive programming。
ServerStart
.builder()
.tcp()
.ip("127.0.0.1")
.port(1888)
.onReadIdle(10000l) //设置读心跳时间
.onWriteIdle(10000l) //设置写心跳时间
.option(ChannelOption.SO_RCVBUF,1023)
.interceptor(frame -> frame,frame -> frame)// 拦截所有message
.setAfterChannelInit(channel -> {// channel设置
})
.connect()
.cast(TcpServerSession.class)
.subscribe(session->{
session.addGroupHandler(groupId -> null).subscribe();
session.addOfflineHandler(new DefaultOffMessageHandler()).subscribe();
session.addUserHandler(new DefaultUserTransportHandler());
});
ClientStart
.builder()
.tcp()
.ip("127.0.0.1")
.port(1888)
.userId("21344") //设置用户名
.password("12312") //设置密码
.onReadIdle(10000l,()->()->System.out.println("心跳了"))//设置读心跳,以及设置回调runner
.setClientType(ClientType.Ios)//设置客户端类型
.setAfterChannelInit(channel -> {
// channel设置
})
.connect()
.cast(TcpClientSession.class)
.subscribe(session->{
session.sendPoint("123","测试一下哦").subscribe(); //发送单聊消息
session.sendGroup("group1","123").subscribe(); // 发送群聊消息
session.accept(message -> {
}); // 接受所有消息
});
FixHeader 【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
Topic 【n byte】
from length | to length | from | to |
---|---|---|---|
1byte | 1byte | n byte | n byte |
Body 【n byte】
body length | body |
---|---|
2 byte | n byte |
timstamp |
---|
8 byte |
FixHeader 【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
ConnectionState 【n byte】
user length | password length | user | password |
---|---|---|---|
1byte | 1byte | n byte | n byte |
FixHeader 【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |
ConnectionState 【n byte】
user length | user |
---|---|
1byte | n byte |
FixHeader 【1 byte】
client_type | message_type |
---|---|
high 4bit | low 4bit |