笔记 | Chronicle-Network的开发已移至闭源,企业客户可以在此处访问闭源存储库 |
Chronicle Network 是一个高性能网络库。
该库旨在通过采用低延迟交易系统中使用的技术来降低延迟并支持更高的吞吐量。
客户端向服务器发送消息,服务器立即用相同的消息响应客户端。该示例的完整源代码可以在以下位置找到:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
下面更详细地解释了此代码的一些关键部分。
TCPRegistry
对于单元测试最有用,它允许您提供真实的主机和端口,例如“localhost:8080”,或者如果您想让应用程序随机为您分配一个空闲端口,您可以只提供文本引用对于端口,例如“host.port”,您可以提供任何您想要的文本。它将始终被视为参考。也就是说,除非它的格式正确,如“主机名:端口”,否则它将使用您提供的确切主机和端口。我们提供此功能的原因是,在单元测试中,您通常希望通过环回启动测试,然后通常进行另一个测试,如果第一个测试未正确关闭,则可能会影响第二个测试。为每个测试提供一个唯一的端口是一种解决方案,但管理这些端口本身可能会成为一个问题。因此,我们创建了TCPRegistry
来为您管理这些端口,当您在每次测试结束时进行清理时,您所要做的就是调用TCPRegistry.reset()
,这将确保所有打开的端口都将被关闭。
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
服务器配置有TextWire
,因此客户端也必须配置有TextWire
。我们将使用的端口(在本例中)由TCPRegistry
确定,当然,在现实生活中的生产环境中,您可能决定不使用TCPRegistry
,或者如果您仍然使用TCPRegistry
则可以使用固定的主机:端口。
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
处理消息的服务器代码:
在这个简单的示例中,我们接收并更新消息,然后立即发回响应,但是可以使用 Chronicle Network 实现其他解决方案,例如服务器稍后响应客户端订阅。
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
创建TcpChannelHub
的客户端代码:
TcpChannelHub
用于将消息发送到服务器,然后读取服务器响应。 TcpChannelHub
确保每个响应都被编组回适当的客户端线程。它通过使用唯一的事务ID(我们将此事务ID称为“tid”)来实现这一点,当服务器响应客户端时,服务器期望服务器将tid作为消息中的第一个字段发回。 TcpChannelHub
将查看每条消息并读取 tid,然后将消息编组到适当的客户端线程上。
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
在此示例中,我们没有实现故障转移支持,因此使用简单的SocketAddressSupplier.uri(desc)
。
创建客户端发送到服务器的消息
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
当您有多个客户端线程时,在将数据写入套接字之前锁定非常重要。
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
为了将正确的回复发送到您的线程,您必须指定 tid。
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
默认情况下,Chronicle Network 服务器使用单个线程来处理所有消息。但是,如果您希望将每个客户端连接专用于其自己的线程,则可以将服务器线程策略更改为:
-DServerThreadingStrategy=并发
有关更多详细信息,请参阅以下枚举net.openhft.chronicle.network.ServerThreadingStrategy
该库需要 Java 8 或 Java 11。
目标环境是支持 10 Gigabit 以太网上的 TCP。在原型测试中,该库的延迟降低了一半,并支持多出 30% 的带宽。
一个关键测试是它不应该使用 -mx64m 进行多次 GC(以允许预热)。
这是以大量连接的可扩展性为代价的。在这种情况下,这个库的性能至少应该与 Netty 一样好。
Netty 具有更广泛的功能,但是它在操作中产生了一些垃圾(比使用普通 NIO 选择器要少),并且不支持繁忙等待,这会导致少量但显着的延迟。