Примечание | Разработка Chronicle-Network перенесена в закрытый исходный код, корпоративные клиенты могут получить доступ к репозиторию с закрытым исходным кодом здесь. |
Chronicle Network — это высокопроизводительная сетевая библиотека.
Эта библиотека спроектирована так, чтобы обеспечить меньшую задержку и поддерживать более высокую пропускную способность за счет использования методов, используемых в торговых системах с низкой задержкой.
Сеть Chronicle использует TCP.
Планируемая поддержка
Общая память
Поддержку UDP можно найти в Chronicle Network Enterprise (коммерческий продукт — обращайтесь по адресу [email protected]).
Клиент отправляет сообщение серверу, сервер немедленно отвечает тем же сообщением клиенту. Полный исходный код этого примера можно найти по адресу:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
Ниже приведены более подробные объяснения некоторых ключевых частей этого кода.
TCPRegistry
наиболее полезен для модульных тестов. Он позволяет вам либо указать настоящий хост и порт, скажем, «localhost:8080», либо, если вы предпочитаете, чтобы приложение выделяло вам свободный порт случайным образом, вы можете просто предоставить текстовую ссылку. для порта, например «host.port», вы можете указать любой текст, который захотите. Он всегда будет использоваться в качестве ссылки. То есть, если он не сформирован правильно, например «имя хоста:порт», тогда он будет использовать именно тот хост и порт, которые вы указали. Причина, по которой мы предлагаем эту функциональность, заключается в том, что довольно часто в модульных тестах вы хотите запустить тест через петлю, за которым часто следует другой тест, и если первый тест не завершается правильно, это может повлиять на второй тест. Одним из решений является предоставление каждому тесту уникального порта, но тогда управление этими портами само по себе может стать проблемой. Итак, мы создали TCPRegistry
, который управляет этими портами для вас. Когда вы приступаете к очистке в конце каждого теста, все, что вам нужно сделать, это вызвать TCPRegistry.reset()
, и это гарантирует, что все открытые порты будут закрыты.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
Сервер настроен с помощью TextWire
, поэтому клиент также должен быть настроен с помощью TextWire
. Порт, который мы будем использовать, будет (в этом примере) определен TCPRegistry
, конечно, в реальной производственной среде вы можете решить не использовать TCPRegistry
или, если вы все еще используете TCPRegistry
вы можете использовать фиксированный хост:порт.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
Код сервера, обрабатывающий сообщение:
В этом простом примере мы получаем и обновляем сообщение, а затем немедленно отправляем обратно ответ, однако существуют и другие решения, которые можно реализовать с помощью Chronicle Network, например, сервер позже отвечает на подписку клиента.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
Клиентский код, создающий TcpChannelHub
:
TcpChannelHub
используется для отправки ваших сообщений на сервер и последующего чтения ответа серверов. TcpChannelHub
гарантирует, что каждый ответ направляется обратно в соответствующий клиентский поток. Это делается за счет использования уникального идентификатора транзакции (мы называем этот идентификатор транзакции «tid»), когда сервер отвечает клиенту, ожидается, что сервер отправит обратно tid в качестве самого первого поля в сообщении. TcpChannelHub
просматривает каждое сообщение и читает tid, а затем направляет сообщение в соответствующий клиентский поток.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
В этом примере мы не реализуем поддержку аварийного переключения, поэтому используется простой SocketAddressSupplier.uri(desc)
.
Создает сообщение, которое клиент отправляет на сервер
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Если у вас есть несколько клиентских потоков, важно заблокировать их перед записью данных в сокет.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Чтобы правильный ответ мог быть отправлен в вашу тему, вам необходимо указать tid.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
По умолчанию сервер Chronicle Network использует один поток для обработки всех сообщений. Однако если вы хотите посвятить каждое клиентское соединение отдельному потоку, вы можете изменить стратегию потоковой обработки сервера следующим образом:
-DServerThreadingStrategy=СОВРЕМЕННО
более подробную информацию см. в следующем перечислении net.openhft.chronicle.network.ServerThreadingStrategy
Для этой библиотеки требуется Java 8 или Java 11.
Целевая среда должна поддерживать TCP через 10 Gigabit Ethernet. При тестировании прототипа эта библиотека имеет вдвое меньшую задержку и поддерживает на 30 % большую пропускную способность.
Ключевой тест заключается в том, что сборку мусора не следует выполнять более одного раза (чтобы обеспечить разогрев) с помощью -mx64m.
Это происходит за счет масштабируемости для большого количества соединений. В этой ситуации эта библиотека должна работать как минимум так же хорошо, как Netty.
Netty имеет гораздо более широкий диапазон функциональных возможностей, однако в своей работе он создает некоторый мусор (меньше, чем при использовании простых селекторов NIO) и не предназначен для поддержки ожидания при занятости, что приводит к небольшой, но значительной задержке.