Nota | El desarrollo de Chronicle-Network se ha trasladado a código cerrado; los clientes empresariales pueden acceder al repositorio de código cerrado aquí |
Chronicle Network es una biblioteca de red de alto rendimiento.
Esta biblioteca está diseñada para tener una latencia más baja y admitir un mayor rendimiento mediante el empleo de técnicas utilizadas en sistemas comerciales de baja latencia.
Chronicle Network utiliza TCP.
Apoyo planificado para
Memoria compartida
El soporte UDP se puede encontrar en Chronicle Network Enterprise (producto comercial: comuníquese con [email protected])
El cliente envía un mensaje al servidor, el servidor responde inmediatamente con el mismo mensaje al cliente. El código fuente completo de este ejemplo se puede encontrar en:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
A continuación se explican algunas de las partes clave de este código, con un poco más de detalle.
TCPRegistry
es más útil para pruebas unitarias, le permite proporcionar un host y un puerto verdaderos, digamos "localhost:8080" o, si prefiere dejar que la aplicación le asigne un puerto libre al azar, puede simplemente proporcionar una referencia de texto. al puerto, como "host.port", puede proporcionar el texto que desee. Siempre se tomará como referencia. Es decir, a menos que esté formado correctamente como "nombre de host: puerto", utilizará el host y el puerto exactos que usted proporcione. La razón por la que ofrecemos esta funcionalidad es que, muy a menudo, en las pruebas unitarias desea iniciar una prueba mediante loopback, seguida a menudo por otra prueba; si la primera prueba no se cierra correctamente, puede afectar a la segunda prueba. Darle a cada prueba un puerto único es una solución, pero luego administrar esos puertos puede convertirse en un problema en sí mismo. Así que creamos TCPRegistry
que administra esos puertos por usted, cuando llegue a limpiar al final de cada prueba, todo lo que tiene que hacer es llamar TCPRegistry.reset()
y esto garantizará que cualquier puerto abierto se cierre.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
El servidor está configurado con TextWire
, por lo que el cliente también debe estar configurado con TextWire
. El puerto que usaremos estará (en este ejemplo) determinado por TCPRegistry
; por supuesto, en un entorno de producción de la vida real puede decidir no usar TCPRegistry
o, si todavía usa TCPRegistry
puede usar un host fijo: puerto.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
El código del servidor que procesa un mensaje:
En este ejemplo simple, recibimos y actualizamos un mensaje y luego enviamos inmediatamente una respuesta; sin embargo, existen otras soluciones que se pueden implementar usando Chronicle Network, como que el servidor responda más tarde a la suscripción de un cliente.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
El código de cliente que crea TcpChannelHub
:
TcpChannelHub
se utiliza para enviar sus mensajes al servidor y luego leer la respuesta del servidor. TcpChannelHub
garantiza que cada respuesta se vuelva a ordenar en el hilo del cliente apropiado. Lo hace mediante el uso de un ID de transacción único (llamamos a este ID de transacción "tid"), cuando el servidor responde al cliente, se espera que el servidor devuelva el tid como el primer campo del mensaje. TcpChannelHub
examinará cada mensaje, leerá el tid y luego ordenará el mensaje en el hilo de cliente correspondiente.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
En este ejemplo no estamos implementando soporte de conmutación por error, por lo que se utiliza el SocketAddressSupplier.uri(desc)
simple.
Crea el mensaje que el cliente envía al servidor.
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Cuando tiene varios subprocesos de cliente, es importante bloquearlos antes de escribir los datos en el socket.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Para que se pueda enviar la respuesta correcta a su hilo, debe especificar el tid.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
De forma predeterminada, el servidor de Chronicle Network utiliza un único hilo para procesar todos los mensajes. Sin embargo, si desea dedicar cada conexión de cliente a su propio hilo, puede cambiar la estrategia de hilo del servidor para:
-DServerThreadingStrategy=CONCURRENTE
consulte la siguiente enumeración para obtener más detalles net.openhft.chronicle.network.ServerThreadingStrategy
Esta biblioteca requiere Java 8 o Java 11.
El entorno de destino es admitir TCP sobre 10 Gigabit Ethernet. En pruebas de prototipos, esta biblioteca tiene la mitad de latencia y admite un 30% más de ancho de banda.
Una prueba clave es que no se debe realizar GC más de una vez (para permitir el calentamiento) con -mx64m.
Esto tiene el costo de la escalabilidad para una gran cantidad de conexiones. En esta situación, esta biblioteca debería funcionar al menos tan bien como Netty.
Netty tiene una gama mucho más amplia de funcionalidades, sin embargo, crea algo de basura en su funcionamiento (menos que usar selectores NIO simples) y no está diseñado para soportar esperas ocupadas, lo que produce un retraso pequeño pero significativo.