메모 | Chronicle-Network 개발이 비공개 소스로 전환되었으며, 기업 고객은 여기에서 비공개 소스 저장소에 액세스할 수 있습니다. |
Chronicle Network는 고성능 네트워크 라이브러리입니다.
이 라이브러리는 대기 시간이 짧은 거래 시스템에 사용되는 기술을 사용하여 대기 시간을 줄이고 더 높은 처리량을 지원하도록 설계되었습니다.
Chronicle 네트워크는 TCP를 사용합니다.
다음에 대한 지원 계획
공유 메모리
UDP 지원은 Chronicle Network Enterprise에서 찾을 수 있습니다(상용 제품 - [email protected]에 문의).
클라이언트가 서버에 메시지를 보내면 서버는 즉시 동일한 메시지로 클라이언트에 응답합니다. 이 예제의 전체 소스 코드는 다음에서 찾을 수 있습니다.
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
다음은 이 코드의 주요 부분 중 일부를 좀 더 자세히 설명합니다.
TCPRegistry
는 단위 테스트에 가장 유용합니다. "localhost:8080"과 같이 실제 호스트와 포트를 제공하거나 응용 프로그램이 무료 포트를 무작위로 할당하도록 하려면 텍스트 참조만 제공하면 됩니다. "host.port"와 같은 포트에 원하는 텍스트를 제공할 수 있습니다. 항상 참고자료로 활용하겠습니다. 즉, "호스트 이름:포트"와 같은 형식이 올바르게 지정되지 않은 경우에는 사용자가 제공한 정확한 호스트와 포트를 사용하게 됩니다. 우리가 이 기능을 제공하는 이유는 루프백을 통해 테스트를 시작하려는 단위 테스트에서 종종 다른 테스트가 뒤따르기 때문입니다. 첫 번째 테스트가 올바르게 종료되지 않으면 두 번째 테스트에 영향을 미칠 수 있습니다. 각 테스트에 고유한 포트를 제공하는 것이 하나의 솔루션이지만 해당 포트를 관리하는 것 자체가 문제가 될 수 있습니다. 그래서 우리는 이러한 포트를 관리하는 TCPRegistry
만들었습니다. 각 테스트가 끝날 때 정리할 때 TCPRegistry.reset()
호출하기만 하면 모든 열려 있는 포트가 닫히게 됩니다.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
서버는 TextWire
로 구성되므로 클라이언트도 TextWire
로 구성해야 합니다. 우리가 사용할 포트는 (이 예에서는) TCPRegistry
에 의해 결정됩니다. 물론 실제 프로덕션 환경에서는 TCPRegistry
를 사용하지 않기로 결정할 수도 있고 여전히 TCPRegistry
사용하는 경우 고정된 호스트:포트를 사용할 수 있습니다.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
메시지를 처리하는 서버 코드:
이 간단한 예시에서는 메시지를 수신하고 업데이트한 후 즉시 응답을 다시 보냅니다. 그러나 나중에 클라이언트 구독에 응답하는 서버와 같이 Chronicle Network를 사용하여 구현할 수 있는 다른 솔루션도 있습니다.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
TcpChannelHub
를 생성하는 클라이언트 코드:
TcpChannelHub
는 메시지를 서버에 보낸 다음 서버 응답을 읽는 데 사용됩니다. TcpChannelHub
각 응답이 적절한 클라이언트 스레드로 다시 마샬링되도록 합니다. 이는 고유한 트랜잭션 ID(이 트랜잭션 ID를 "tid"라고 함)를 사용하여 이를 수행합니다. 서버가 클라이언트에 응답할 때 서버는 tid를 메시지의 첫 번째 필드로 다시 보낼 것으로 예상됩니다. TcpChannelHub
각 메시지를 보고 tid를 읽은 다음 메시지를 적절한 클라이언트 스레드로 마샬링합니다.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
이 예에서는 장애 조치 지원을 구현하지 않으므로 간단한 SocketAddressSupplier.uri(desc)
사용됩니다.
클라이언트가 서버에 보내는 메시지를 생성합니다.
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
여러 클라이언트 스레드가 있는 경우 소켓에 데이터를 쓰기 전에 잠그는 것이 중요합니다.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
스레드에 올바른 응답을 보내려면 tid를 지정해야 합니다.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
기본적으로 Chronicle Network 서버는 단일 스레드를 사용하여 모든 메시지를 처리합니다. 그러나 각 클라이언트 연결을 자체 스레드 전용으로 지정하려면 서버 스레딩 전략을 다음과 같이 변경할 수 있습니다.
-DServerThreadingStrategy=동시
자세한 내용은 다음 열거형을 참조하세요. net.openhft.chronicle.network.ServerThreadingStrategy
이 라이브러리에는 Java 8 또는 Java 11이 필요합니다.
대상 환경은 10기가비트 이더넷을 통해 TCP를 지원하는 것입니다. 프로토타입 테스트에서 이 라이브러리는 대기 시간이 절반이고 30% 더 많은 대역폭을 지원합니다.
핵심 테스트는 -mx64m을 사용하여 워밍업을 허용하기 위해 두 번 이상 GC를 수행해서는 안 된다는 것입니다.
이는 많은 수의 연결에 대한 확장성을 희생합니다. 이런 상황에서 이 라이브러리는 최소한 Netty만큼의 성능을 발휘해야 합니다.
Netty는 훨씬 더 넓은 범위의 기능을 가지고 있지만 작동 시 약간의 쓰레기를 생성하고(일반 NIO 선택기를 사용하는 것보다 적음) 작지만 상당한 지연을 제공하는 바쁜 대기를 지원하도록 설계되지 않았습니다.