Note | Le développement de Chronicle-Network a été déplacé vers une source fermée, les entreprises clientes peuvent accéder au référentiel source fermée ici |
Chronicle Network est une bibliothèque réseau hautes performances.
Cette bibliothèque est conçue pour offrir une latence plus faible et prendre en charge un débit plus élevé en employant des techniques utilisées dans les systèmes de trading à faible latence.
Chronicle Network utilise TCP.
Prise en charge prévue pour
Mémoire partagée
La prise en charge UDP est disponible dans Chronicle Network Enterprise (produit commercial - contactez [email protected])
Le client envoie un message au serveur, le serveur répond immédiatement avec le même message au client. Le code source complet de cet exemple peut être trouvé à l'adresse :
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
Vous trouverez ci-dessous quelques-unes des parties clés de ce code expliquées, un peu plus en détail.
Le TCPRegistry
est très utile pour les tests unitaires, il vous permet soit de fournir un véritable hôte et un vrai port, par exemple "localhost:8080", soit si vous préférez laisser l'application vous allouer un port libre au hasard, vous pouvez simplement fournir une référence textuelle. au port, tel que "host.port", vous pouvez fournir le texte de votre choix. Il sera toujours pris comme référence. Autrement dit, à moins qu'il ne soit correctement formé comme "nom d'hôte: port", il utilisera exactement l'hôte et le port que vous fournissez. La raison pour laquelle nous proposons cette fonctionnalité est que très souvent, dans les tests unitaires, vous souhaitez démarrer un test via un bouclage, suivi souvent d'un autre test, si le premier test ne s'arrête pas correctement, cela peut avoir un impact sur le deuxième test. Attribuer à chaque test un port unique est une solution, mais la gestion de ces ports peut alors devenir un problème en soi. Nous avons donc créé le TCPRegistry
qui gère ces ports pour vous. Lorsque vous effectuez le nettoyage à la fin de chaque test, tout ce que vous avez à faire est d'appeler TCPRegistry.reset()
et cela garantira que tous les ports ouverts seront fermés.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
Le serveur est configuré avec TextWire
, le client doit donc également être configuré avec TextWire
. Le port que nous utiliserons sera (dans cet exemple) déterminé par le TCPRegistry
, bien sûr, dans un environnement de production réel, vous pouvez décider de ne pas utiliser le TCPRegistry
ou si vous utilisez toujours le TCPRegistry
vous pouvez utiliser un hôte:port fixe.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
Le code du serveur qui traite un message :
Dans cet exemple simple, nous recevons et mettons à jour un message, puis renvoyons immédiatement une réponse. Cependant, il existe d'autres solutions qui peuvent être mises en œuvre à l'aide de Chronicle Network, comme le serveur répondant plus tard à un abonnement client.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
Le code client qui crée le TcpChannelHub
:
Le TcpChannelHub
est utilisé pour envoyer vos messages au serveur, puis lire la réponse du serveur. Le TcpChannelHub
garantit que chaque réponse est renvoyée sur le thread client approprié. Il le fait grâce à l'utilisation d'un identifiant de transaction unique (nous appelons cet identifiant de transaction le "tid"). Lorsque le serveur répond au client, il s'attend à ce que le serveur renvoie le tid comme tout premier champ du message. Le TcpChannelHub
examinera chaque message et lira le tid, puis regroupera le message sur votre thread client approprié.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
Dans cet exemple, nous n'implémentons pas de prise en charge du basculement, c'est pourquoi le simple SocketAddressSupplier.uri(desc)
est utilisé.
Crée le message que le client envoie au serveur
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Lorsque vous disposez de plusieurs threads clients, il est important de verrouiller avant d'écrire les données sur le socket.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Afin que la réponse correcte puisse être envoyée à votre fil de discussion, vous devez spécifier le tide.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
Par défaut, le serveur Chronicle Network utilise un seul thread pour traiter tous les messages. Cependant, si vous souhaitez dédier chaque connexion client à son propre thread, vous pouvez modifier la stratégie de thread du serveur pour :
-DServerThreadingStrategy=CONCURRENT
voir l'énumération suivante pour plus de détails net.openhft.chronicle.network.ServerThreadingStrategy
Cette bibliothèque nécessite Java 8 ou Java 11.
L'environnement cible est de prendre en charge TCP sur 10 Gigabit Ethernet. Lors des tests de prototypes, cette bibliothèque a une latence deux fois inférieure et prend en charge 30 % de bande passante en plus.
Un test clé est qu'il ne doit pas être GC plus d'une fois (pour permettre l'échauffement) avec -mx64m.
Cela se fait au détriment de l’évolutivité pour un grand nombre de connexions. Dans cette situation, cette bibliothèque devrait fonctionner au moins aussi bien que Netty.
Netty a une gamme de fonctionnalités beaucoup plus large, mais il crée des déchets dans son fonctionnement (moins que l'utilisation de simples sélecteurs NIO) et n'est pas conçu pour prendre en charge une attente occupée, ce qui entraîne un retard faible mais significatif.