注記 | Chronicle-Network の開発はクローズド ソースに移行されました。企業のお客様はここからクローズド ソース リポジトリにアクセスできます。 |
Chronicle Network は、高性能ネットワーク ライブラリです。
このライブラリは、低レイテンシの取引システムで使用される手法を採用することで、レイテンシが低くなり、より高いスループットをサポートするように設計されています。
Chronicle Network は TCP を使用します。
予定されているサポート
共有メモリ
UDP サポートは Chronicle Network Enterprise (商用製品 - [email protected] にお問い合わせください) で見つかります。
クライアントがサーバーにメッセージを送信すると、サーバーはすぐに同じメッセージをクライアントに返します。この例の完全なソース コードは次の場所にあります。
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
以下に、このコードの重要な部分の一部をもう少し詳しく説明します。
TCPRegistry
単体テストに最も役立ちます。TCPRegistry を使用すると、実際のホストとポート (たとえば「localhost:8080」) を指定できます。あるいは、アプリケーションに空きポートをランダムに割り当てさせたい場合は、テキスト参照を指定するだけです。 「host.port」などのポートに、任意のテキストを入力できます。常に参考にさせていただきます。つまり、「ホスト名:ポート」のように正しく形成されていない限り、指定した正確なホストとポートが使用されます。この機能を提供する理由は、単体テストでループバック経由でテストを開始し、その後に別のテストを開始することがよくあるためです。最初のテストが正しくシャットダウンしないと、2 番目のテストに影響を与える可能性があります。各テストに固有のポートを与えることは 1 つの解決策ですが、その場合、それらのポートの管理自体が問題になる可能性があります。そこで、これらのポートを管理するTCPRegistry
を作成しました。各テストの最後にクリーンアップするときは、 TCPRegistry.reset()
呼び出すだけで済みます。これにより、開いているポートが確実に閉じられます。
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
サーバーはTextWire
を使用して構成されているため、クライアントもTextWire
を使用して構成する必要があります。使用するポートは (この例では) TCPRegistry
によって決定されます。もちろん、実際の運用環境では、 TCPRegistry
使用しないことを決定することもできます。または、まだTCPRegistry
使用する場合は、固定の host:port を使用できます。
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
メッセージを処理するサーバー コード:
この単純な例では、メッセージを受信して更新し、すぐに応答を送り返しますが、サーバーが後でクライアント サブスクリプションに応答するなど、Chronicle Network を使用して実装できる他のソリューションもあります。
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
TcpChannelHub
を作成するクライアント コード:
TcpChannelHub
、メッセージをサーバーに送信し、サーバーの応答を読み取るために使用されます。 TcpChannelHub
、各応答が適切なクライアント スレッドにマーシャリングされて戻されることを保証します。これは、一意のトランザクション ID (このトランザクション ID を「tid」と呼びます) を使用して行われ、サーバーがクライアントに応答するときに、サーバーがメッセージの最初のフィールドとして tid を送り返すことが期待されます。 TcpChannelHub
各メッセージを調べて tid を読み取り、メッセージを適切なクライアント スレッドにマーシャリングします。
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
この例では、フェイルオーバーのサポートを実装していないため、単純なSocketAddressSupplier.uri(desc)
が使用されます。
クライアントがサーバーに送信するメッセージを作成します
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
複数のクライアント スレッドがある場合は、ソケットにデータを書き込む前にロックすることが重要です。
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
正しい応答をスレッドに送信できるようにするには、tid を指定する必要があります。
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
デフォルトでは、Chronicle Network サーバーは単一のスレッドを使用してすべてのメッセージを処理します。ただし、各クライアント接続を独自のスレッド専用にしたい場合は、サーバーのスレッド化戦略を次のように変更できます。
-DServerThreadingStrategy=CONCURRENT
詳細については、次の列挙型を参照してください。 net.openhft.chronicle.network.ServerThreadingStrategy
このライブラリには Java 8 または Java 11 が必要です。
ターゲット環境は、TCP over 10 ギガビット イーサネットをサポートすることです。プロトタイプ テストでは、このライブラリのレイテンシは半分で、30% 多い帯域幅をサポートします。
重要なテストは、(ウォームアップを可能にするために) -mx64m を使用して GC を複数回実行すべきではないということです。
これには、多数の接続に対するスケーラビリティが犠牲になります。この状況では、このライブラリは少なくとも Netty と同等のパフォーマンスを発揮するはずです。
Netty ははるかに幅広い機能を備えていますが、操作中に多少のガベージが発生し (プレーンな NIO セレクターを使用するよりも少ない)、小さいながらも大幅な遅延が発生するビジー待機をサポートするように設計されていません。