笔记 | Chronicle-Network的开发已移至闭源,企业客户可以在此处访问闭源存储库 |
Chronicle Network 是一个高性能网络库。
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
在这个简单的示例中,我们接收并更新消息,然后立即发回响应,但是可以使用 Chronicle Network 实现其他解决方案,例如服务器稍后响应客户端订阅。
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
* Simply reads the csp,tid and payload and sends back the tid and payload
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
用于将消息发送到服务器,然后读取服务器响应。 TcpChannelHub
确保每个响应都被编组回适当的客户端线程。它通过使用唯一的事务ID(我们将此事务ID称为“tid”)来实现这一点,当服务器响应客户端时,服务器期望服务器将tid作为消息中的第一个字段发回。 TcpChannelHub
将查看每条消息并读取 tid,然后将消息编组到适当的客户端线程上。
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
为了将正确的回复发送到您的线程,您必须指定 tid。
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
默认情况下,Chronicle Network 服务器使用单个线程来处理所有消息。但是,如果您希望将每个客户端连接专用于其自己的线程,则可以将服务器线程策略更改为:
该库需要 Java 8 或 Java 11。
目标环境是支持 10 Gigabit 以太网上的 TCP。在原型测试中,该库的延迟降低了一半,并支持多出 30% 的带宽。
一个关键测试是它不应该使用 -mx64m 进行多次 GC(以允许预热)。
这是以大量连接的可扩展性为代价的。在这种情况下,这个库的性能至少应该与 Netty 一样好。
Netty 具有更广泛的功能,但是它在操作中产生了一些垃圾(比使用普通 NIO 选择器要少),并且不支持繁忙等待,这会导致少量但显着的延迟。