筆記 | Chronicle-Network的開發已移至閉源,企業客戶可以在此處存取閉源儲存庫 |
Chronicle Network 是一個高效能網路庫。
該庫旨在透過採用低延遲交易系統中使用的技術來降低延遲並支援更高的吞吐量。
客戶端向伺服器發送訊息,伺服器立即以相同的訊息回應客戶端。此範例的完整原始程式碼可以在以下位置找到:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
下面更詳細地解釋了此程式碼的一些關鍵部分。
TCPRegistry
對於單元測試最有用,它允許您提供真實的主機和端口,例如“localhost:8080”,或者如果您想讓應用程式隨機為您分配一個空閒端口,您可以只提供文字引用對於端口,例如“ host.port”,您可以提供任何您想要的文字。它將始終被視為參考。也就是說,除非它的格式正確,例如“主機名稱:連接埠”,否則它將使用您提供的確切主機和連接埠。我們提供此功能的原因是,在單元測試中,您通常希望透過環回啟動測試,然後通常進行另一個測試,如果第一個測試未正確關閉,則可能會影響第二個測試。為每個測試提供一個唯一的連接埠是一種解決方案,但管理這些連接埠本身可能會成為一個問題。因此,我們創建了TCPRegistry
來為您管理這些端口,當您在每次測試結束時進行清理時,您所要做的就是調用TCPRegistry.reset()
,這將確保所有打開的端口都將關閉。
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
伺服器配置有TextWire
,因此客戶端也必須配置TextWire
。我們將使用的連接埠(在本例中)由TCPRegistry
確定,當然,在現實生活中的生產環境中,您可能決定不使用TCPRegistry
,或者如果您仍然使用TCPRegistry
則可以使用固定的主機:連接埠。
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
處理訊息的伺服器代碼:
在這個簡單的範例中,我們接收並更新訊息,然後立即發迴回應,但可以使用 Chronicle Network 實作其他解決方案,例如伺服器稍後回應客戶端訂閱。
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
建立TcpChannelHub
的客戶端程式碼:
TcpChannelHub
用於將訊息傳送到伺服器,然後讀取伺服器回應。 TcpChannelHub
確保每個回應都被編組回適當的客戶端執行緒。它透過使用唯一的事務ID(我們將此事務ID稱為「tid」)來實現這一點,當伺服器回應客戶端時,伺服器期望伺服器將tid作為訊息中的第一個欄位發回。 TcpChannelHub
將查看每個訊息並讀取 tid,然後將訊息編組到適當的客戶端執行緒上。
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
在此範例中,我們沒有實作故障轉移支持,因此使用簡單的SocketAddressSupplier.uri(desc)
。
建立客戶端傳送到伺服器的訊息
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
當您有多個客戶端執行緒時,在將資料寫入套接字之前鎖定非常重要。
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
為了將正確的回覆傳送到您的線程,您必須指定 tid。
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
預設情況下,Chronicle Network 伺服器使用單一執行緒來處理所有訊息。但是,如果您希望將每個客戶端連接專用於自己的線程,則可以將伺服器線程原則變更為:
-DServerThreadingStrategy=並發
有關更多詳細信息,請參閱以下枚舉net.openhft.chronicle.network.ServerThreadingStrategy
該函式庫需要 Java 8 或 Java 11。
目標環境是支援 10 Gigabit 乙太網路上的 TCP。在原型測試中,該庫的延遲降低了一半,並支援多出 30% 的頻寬。
一個關鍵測試是它不應該使用 -mx64m 進行多次 GC(以允許預熱)。
這是以大量連接的可擴展性為代價的。在這種情況下,這個函式庫的效能至少應該與 Netty 一樣好。
Netty 具有更廣泛的功能,但是它在操作中產生了一些垃圾(比使用普通 NIO 選擇器要少),並且不支援繁忙等待,這會導致少量但顯著的延遲。