Notiz | Die Entwicklung von Chronicle-Network wurde auf Closed-Source verlagert. Unternehmenskunden können hier auf das Closed-Source-Repository zugreifen |
Chronicle Network ist eine leistungsstarke Netzwerkbibliothek.
Diese Bibliothek ist auf eine geringere Latenz und einen höheren Durchsatz ausgelegt, indem Techniken eingesetzt werden, die in Handelssystemen mit niedriger Latenz verwendet werden.
Chronicle Network verwendet TCP.
Geplante Unterstützung für
Gemeinsamer Speicher
UDP-Unterstützung finden Sie in Chronicle Network Enterprise (kommerzielles Produkt – kontaktieren Sie [email protected]).
Der Client sendet eine Nachricht an den Server, der Server antwortet sofort mit derselben Nachricht an den Client zurück. Den vollständigen Quellcode dieses Beispiels finden Sie unter:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
Im Folgenden werden einige der wichtigsten Teile dieses Codes etwas ausführlicher erläutert.
Die TCPRegistry
eignet sich besonders für Unit-Tests. Sie ermöglicht es Ihnen, entweder einen echten Host und Port anzugeben, z. B. „localhost:8080“, oder Sie können einfach eine Textreferenz angeben, wenn Sie lieber möchten, dass die Anwendung Ihnen zufällig einen freien Port zuweist B. „host.port“, können Sie einen beliebigen Text angeben. Es wird immer als Referenz verwendet. Sofern es nicht korrekt wie „Hostname:Port“ formatiert ist, werden genau der von Ihnen angegebene Host und Port verwendet. Der Grund, warum wir diese Funktionalität anbieten, ist, dass Sie bei Unit-Tests häufig einen Test über Loopback starten möchten, gefolgt von einem weiteren Test. Wenn der erste Test nicht ordnungsgemäß beendet wird, kann dies Auswirkungen auf den zweiten Test haben. Eine Lösung besteht darin, jedem Test einen eindeutigen Port zuzuweisen, aber dann kann die Verwaltung dieser Ports zu einem Problem für sich werden. Deshalb haben wir die TCPRegistry
erstellt, die diese Ports für Sie verwaltet. Wenn Sie am Ende jedes Tests zur Bereinigung kommen, müssen Sie nur TCPRegistry.reset()
aufrufen und dadurch wird sichergestellt, dass alle offenen Ports geschlossen werden.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
Der Server ist mit TextWire
konfiguriert, daher muss der Client auch mit TextWire
konfiguriert sein. Der Port, den wir verwenden werden, wird (in diesem Beispiel) durch die TCPRegistry
bestimmt. In einer realen Produktionsumgebung können Sie sich natürlich entscheiden, die TCPRegistry
nicht zu verwenden, oder wenn Sie die TCPRegistry
trotzdem verwenden, können Sie einen festen Host:Port verwenden.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
Der Servercode, der eine Nachricht verarbeitet:
In diesem einfachen Beispiel empfangen und aktualisieren wir eine Nachricht und senden dann sofort eine Antwort zurück. Es gibt jedoch auch andere Lösungen, die mit Chronicle Network implementiert werden können, z. B. dass der Server später auf ein Client-Abonnement antwortet.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
Der Clientcode, der den TcpChannelHub
erstellt:
Der TcpChannelHub
wird verwendet, um Ihre Nachrichten an den Server zu senden und dann die Antwort des Servers zu lesen. Der TcpChannelHub
stellt sicher, dass jede Antwort zurück an den entsprechenden Client-Thread gemarshallt wird. Dies geschieht durch die Verwendung einer eindeutigen Transaktions-ID (wir nennen diese Transaktions-ID „tid“). Wenn der Server dem Client antwortet, wird erwartet, dass der Server die tid als allererstes Feld in der Nachricht zurücksendet. Der TcpChannelHub
prüft jede Nachricht, liest die TID und leitet die Nachricht dann an Ihren entsprechenden Client-Thread weiter.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
In diesem Beispiel implementieren wir keine Failover-Unterstützung, daher wird die einfache SocketAddressSupplier.uri(desc)
verwendet.
Erstellt die Nachricht, die der Client an den Server sendet
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Wenn Sie über mehrere Client-Threads verfügen, ist es wichtig, diese zu sperren, bevor Sie die Daten in den Socket schreiben.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Damit die richtige Antwort an Ihren Thread gesendet werden kann, müssen Sie die TID angeben.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
Standardmäßig verwendet der Chronicle Network-Server einen einzelnen Thread, um alle Nachrichten zu verarbeiten. Wenn Sie jedoch jede Client-Verbindung einem eigenen Thread zuweisen möchten, können Sie die Server-Threading-Strategie wie folgt ändern:
-DServerThreadingStrategy=CONCURRENT
Weitere Informationen finden Sie in der folgenden Enumeration net.openhft.chronicle.network.ServerThreadingStrategy
Diese Bibliothek erfordert Java 8 oder Java 11.
Die Zielumgebung soll TCP über 10 Gigabit Ethernet unterstützen. Bei Prototyptests hat diese Bibliothek die halbe Latenz und unterstützt 30 % mehr Bandbreite.
Ein wichtiger Test besteht darin, dass mit -mx64m nicht mehr als einmal eine GC durchgeführt werden sollte (um das Aufwärmen zu ermöglichen).
Dies geht zu Lasten der Skalierbarkeit für eine große Anzahl von Verbindungen. In dieser Situation sollte diese Bibliothek mindestens so gut funktionieren wie Netty.
Netty verfügt über einen viel größeren Funktionsumfang, erzeugt jedoch etwas Müll in seinem Betrieb (weniger als die Verwendung einfacher NIO-Selektoren) und ist nicht für die Unterstützung von „Busy Waiting“ ausgelegt, was zu einer kleinen, aber erheblichen Verzögerung führt.