Catatan | Pengembangan Chronicle-Network telah dipindahkan ke sumber tertutup, pelanggan perusahaan dapat mengakses repositori sumber tertutup di sini |
Chronicle Network adalah perpustakaan jaringan berkinerja tinggi.
Pustaka ini dirancang dengan latensi lebih rendah dan mendukung throughput lebih tinggi dengan menggunakan teknik yang digunakan dalam sistem perdagangan latensi rendah.
Jaringan Chronicle menggunakan TCP.
Dukungan yang direncanakan untuk
Memori Bersama
Dukungan UDP dapat ditemukan di Chronicle Network Enterprise (produk komersial - hubungi [email protected])
Klien mengirimkan pesan ke server, server segera membalas dengan pesan yang sama kembali ke klien. Kode sumber lengkap dari contoh ini dapat ditemukan di:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
Di bawah ini adalah beberapa bagian penting dari kode ini yang dijelaskan secara lebih rinci.
TCPRegistry
paling berguna untuk pengujian unit, memungkinkan Anda menyediakan host dan port yang sebenarnya, misalnya "localhost:8080" atau jika Anda lebih suka membiarkan aplikasi mengalokasikan port gratis secara acak, Anda cukup memberikan referensi teks ke port, seperti, "host.port", Anda dapat memberikan teks apa pun yang Anda inginkan. Itu akan selalu dijadikan referensi. Kecuali jika bentuknya benar seperti "nama host: port", maka ia akan menggunakan host dan port yang sama persis dengan yang Anda berikan. Alasan kami menawarkan fungsi ini cukup sering dalam pengujian unit Anda ingin memulai pengujian melalui loopback, sering kali diikuti dengan pengujian lain, jika pengujian pertama tidak ditutup dengan benar, hal ini dapat berdampak pada pengujian kedua. Memberikan port unik pada setiap pengujian adalah salah satu solusinya, namun mengelola port tersebut dapat menjadi masalah tersendiri. Jadi kami membuat TCPRegistry
yang mengelola port tersebut untuk Anda, ketika Anda datang untuk membersihkan di akhir setiap pengujian, yang harus Anda lakukan hanyalah memanggil TCPRegistry.reset()
dan ini akan memastikan bahwa semua port yang terbuka akan ditutup.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
Server dikonfigurasi dengan TextWire
, sehingga klien juga harus dikonfigurasi dengan TextWire
. Port yang akan kita gunakan akan (dalam contoh ini) ditentukan oleh TCPRegistry
, tentu saja dalam lingkungan produksi kehidupan nyata Anda dapat memutuskan untuk tidak menggunakan TCPRegistry
atau jika Anda masih menggunakan TCPRegistry
Anda dapat menggunakan host:port tetap.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
Kode server yang memproses pesan:
Dalam contoh sederhana ini kami menerima dan memperbarui pesan dan kemudian segera mengirimkan kembali tanggapan, namun ada solusi lain yang dapat diterapkan menggunakan Chronicle Network, seperti server kemudian merespons langganan klien.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
Kode klien yang membuat TcpChannelHub
:
TcpChannelHub
digunakan untuk mengirim pesan Anda ke server dan kemudian membaca respons server. TcpChannelHub
memastikan bahwa setiap respons disusun kembali ke thread klien yang sesuai. Hal ini dilakukan melalui penggunaan ID transaksi unik (kami menyebut ID transaksi ini "tid" ), ketika server merespons klien, diharapkan server mengirimkan kembali tid sebagai kolom pertama dalam pesan. TcpChannelHub
akan melihat setiap pesan dan membaca arusnya, lalu menyusun pesan tersebut ke thread klien yang sesuai.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
Dalam contoh ini kami tidak menerapkan dukungan failover, sehingga SocketAddressSupplier.uri(desc)
yang sederhana digunakan.
Membuat pesan yang dikirim klien ke server
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
Bila Anda memiliki beberapa thread klien, penting untuk menguncinya sebelum menulis data ke soket.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
Agar balasan yang benar dapat dikirim ke thread Anda, Anda harus menentukan waktunya.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
Secara default, server Chronicle Network menggunakan satu thread untuk memproses semua pesan. Namun, jika Anda ingin mendedikasikan setiap koneksi klien ke threadnya sendiri, maka Anda dapat mengubah strategi threading server, menjadi:
-DServerThreadingStrategy=BERSAMAAN
lihat enum berikut untuk lebih jelasnya net.openhft.chronicle.network.ServerThreadingStrategy
Pustaka ini memerlukan Java 8 atau Java 11.
Lingkungan targetnya adalah mendukung TCP melalui 10 Gigabit Ethernet. Dalam pengujian prototipe, perpustakaan ini memiliki setengah latensi dan mendukung bandwidth 30% lebih banyak.
Tes utamanya adalah tidak boleh melakukan GC lebih dari sekali (untuk memungkinkan pemanasan) dengan -mx64m.
Hal ini mengorbankan skalabilitas untuk koneksi dalam jumlah besar. Dalam situasi ini, perpustakaan ini setidaknya harus bekerja sama baiknya dengan Netty.
Netty memiliki fungsionalitas yang jauh lebih luas, namun ia menimbulkan beberapa sampah dalam pengoperasiannya (lebih sedikit dibandingkan menggunakan NIO Selector biasa) dan tidak dirancang untuk mendukung penantian sibuk yang menghasilkan penundaan kecil namun signifikan.