Observação | O desenvolvimento do Chronicle-Network foi movido para código fechado, os clientes corporativos podem acessar o repositório de código fechado aqui |
Chronicle Network é uma biblioteca de rede de alto desempenho.
Esta biblioteca foi projetada para ter menor latência e suportar maior rendimento, empregando técnicas usadas em sistemas de negociação de baixa latência.
A Chronicle Network usa TCP.
Apoio planejado para
Memória Compartilhada
O suporte UDP pode ser encontrado no Chronicle Network Enterprise (produto comercial - entre em contato com [email protected])
O cliente envia uma mensagem ao servidor, o servidor responde imediatamente com a mesma mensagem ao cliente. O código fonte completo deste exemplo pode ser encontrado em:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
Abaixo estão algumas das partes principais deste código explicadas, com um pouco mais de detalhes.
O TCPRegistry
é mais útil para testes de unidade, pois permite que você forneça um host e uma porta verdadeiros, digamos "localhost:8080" ou se preferir deixar o aplicativo alocar uma porta livre aleatoriamente, você pode apenas fornecer uma referência de texto à porta, como "host.port", você pode fornecer qualquer texto que desejar. Será sempre tomado como referência. Isto é, a menos que esteja corretamente formado como "hostname:port", então ele usará o host e a porta exatos que você forneceu. A razão pela qual oferecemos esta funcionalidade é que muitas vezes em testes de unidade você deseja iniciar um teste via loopback, seguido muitas vezes por outro teste, se o primeiro teste não for encerrado corretamente, isso poderá impactar no segundo teste. Dar a cada teste uma porta exclusiva é uma solução, mas o gerenciamento dessas portas pode se tornar um problema por si só. Então criamos o TCPRegistry
que gerencia essas portas para você, quando você vier fazer a limpeza ao final de cada teste, basta chamar TCPRegistry.reset()
e isso garantirá que todas as portas abertas serão fechadas.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
O servidor está configurado com TextWire
, portanto o cliente também deve estar configurado com TextWire
. A porta que usaremos será (neste exemplo) determinada pelo TCPRegistry
, é claro que em um ambiente de produção da vida real você pode decidir não usar o TCPRegistry
ou se ainda usar o TCPRegistry
você pode usar um host:port fixo.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
O código do servidor que processa uma mensagem:
Neste exemplo simples, recebemos e atualizamos uma mensagem e imediatamente enviamos uma resposta, porém existem outras soluções que podem ser implementadas usando Chronicle Network, como o servidor responder posteriormente a uma assinatura de cliente.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
O código do cliente que cria o TcpChannelHub
:
O TcpChannelHub
é usado para enviar suas mensagens ao servidor e então ler a resposta do servidor. O TcpChannelHub
garante que cada resposta seja empacotada de volta no thread do cliente apropriado. Isso é feito através do uso de um ID de transação exclusivo (chamamos esse ID de transação de "tid"). Quando o servidor responde ao cliente, espera-se que o servidor envie de volta o tid como o primeiro campo da mensagem. O TcpChannelHub
examinará cada mensagem e lerá o tid e, em seguida, organizará a mensagem no thread do cliente apropriado.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
Neste exemplo, não estamos implementando suporte a failover, portanto, o simples SocketAddressSupplier.uri(desc)
é usado.
Cria a mensagem que o cliente envia ao servidor
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Quando você tem vários threads de cliente, é importante bloquear antes de gravar os dados no soquete.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Para que a resposta correta possa ser enviada ao seu tópico, você deve especificar o tid.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
Por padrão, o servidor Chronicle Network usa um único thread para processar todas as mensagens. No entanto, se desejar dedicar cada conexão do cliente ao seu próprio encadeamento, você poderá alterar a estratégia de encadeamento do servidor para:
-DServerThreadingStrategy=CONCORRENTE
consulte a enumeração a seguir para obter mais detalhes net.openhft.chronicle.network.ServerThreadingStrategy
Esta biblioteca requer Java 8 ou Java 11.
O ambiente de destino é suportar TCP em 10 Gigabit Ethernet. Nos testes de protótipo, esta biblioteca tem metade da latência e suporta 30% mais largura de banda.
Um teste importante é que ele não deve fazer GC mais de uma vez (para permitir o aquecimento) com -mx64m.
Isso tem o custo da escalabilidade para um grande número de conexões. Nessa situação, esta biblioteca deve ter um desempenho pelo menos tão bom quanto o Netty.
O Netty tem uma gama muito mais ampla de funcionalidades, porém cria algum lixo em sua operação (menos do que usar seletores NIO simples) e não foi projetado para suportar espera ocupada, o que resulta em um atraso pequeno, mas significativo.