https://player.vimeo.com/video/201989439
Chronicle Queue adalah kerangka kerja pesan latensi rendah yang bertahan untuk aplikasi kinerja tinggi.
Proyek ini mencakup versi java dari Chronicle Queue. Versi C ++ dari proyek ini juga tersedia dan mendukung interoperabilitas Java/C ++ plus binding bahasa tambahan misalnya Python. Jika Anda tertarik untuk mengevaluasi versi C ++, silakan hubungi [email protected].
Pada pandangan pertama, antrian kronik dapat dilihat sebagai implementasi antrian lain . Namun, ia memiliki pilihan desain utama yang harus ditekankan. Menggunakan penyimpanan off-heap , Chronicle Queue menyediakan lingkungan di mana aplikasi tidak menderita pengumpulan sampah (GC). Saat menerapkan aplikasi berkinerja tinggi dan intensif memori (Anda mendengar istilah mewah "BigData"?) Di Java, salah satu masalah terbesar adalah pengumpulan sampah.
Antrian Chronicle memungkinkan pesan ditambahkan ke akhir antrian ("ditambahkan"), dibaca dari antrian ("tailed"), dan juga mendukung pencarian akses acak.
Anda dapat menganggap antrian Chronicle mirip dengan topik latensi-latensi-latensi-latency yang tahan lama/bertahan yang dapat berisi pesan dari berbagai jenis dan ukuran. Chronicle Queue adalah antrian persikatan yang tidak terhalang yang:
Mendukung RMI asinkron dan mempublikasikan/berlangganan antarmuka dengan latensi mikro.
menyampaikan pesan di antara JVM di bawah mikrodetik
Meneruskan pesan antara JVM pada mesin yang berbeda melalui replikasi di bawah 10 mikrodetik (fitur perusahaan)
Memberikan latensi real-time yang stabil dan lunak ke dalam jutaan pesan per detik untuk satu utas ke satu antrian; dengan pemesanan total setiap acara.
Saat menerbitkan pesan 40-byte, persentase tinggi dari waktu kami mencapai latensi di bawah 1 mikrodetik. Latensi persentil ke -99 adalah yang terburuk 1 dalam 100, dan persentil ke -99,9 adalah latensi terburuk 1 dari 1000.
Ukuran batch | 10 juta acara per menit | 60 juta acara per menit | 100 juta acara per menit |
---|---|---|---|
99%ile | 0,78 μs | 0,78 μs | 1.2 μs |
99,9%ile | 1.2 μs | 1.3 μs | 1,5 μs |
Ukuran batch | 10 juta acara per menit | 60 juta acara per menit | 100 juta acara per menit |
---|---|---|---|
99%ile | 20 μs | 28 μs | 176 μs |
99,9%ile | 901 μs | 705 μs | 5.370 μs |
Catatan | 100 juta acara per menit mengirim acara setiap 660 nanodetik; direplikasi dan bertahan. |
Penting | Kinerja ini tidak dicapai dengan menggunakan sekelompok mesin besar . Ini menggunakan satu utas untuk diterbitkan, dan satu utas untuk dikonsumsi. |
Antrian Chronicle dirancang untuk:
Jadilah "Rekam Everything Store" yang dapat dibaca dengan latensi real-time mikrodetik. Ini bahkan mendukung sistem perdagangan frekuensi tinggi yang paling menuntut. Namun, dapat digunakan dalam aplikasi apa pun di mana perekaman informasi menjadi perhatian.
Dukungan replikasi yang dapat diandalkan dengan pemberitahuan untuk appender (penulis pesan) atau penerima (pembaca pesan), ketika sebuah pesan telah berhasil direplikasi.
Chronicle Queue mengasumsikan ruang disk murah dibandingkan dengan memori. Chronicle Queue memanfaatkan sepenuhnya ruang disk yang Anda miliki, sehingga Anda tidak dibatasi oleh memori utama mesin Anda. Jika Anda menggunakan pemintalan HDD, Anda dapat menyimpan banyak TBS ruang disk dengan sedikit biaya.
Satu -satunya perangkat lunak tambahan yang perlu dijalankan antrian adalah sistem operasi. Itu tidak memiliki broker; Sebaliknya ia menggunakan sistem operasi Anda untuk melakukan semua pekerjaan. Jika aplikasi Anda mati, sistem operasi terus berjalan untuk detik lebih lama, jadi tidak ada data yang hilang; bahkan tanpa replikasi.
Ketika Chronicle Queue menyimpan semua data yang disimpan dalam file yang dipetakan memori, ini memiliki overhead on-heap yang sepele, bahkan jika Anda memiliki lebih dari 100 TB data.
Chronicle berusaha keras untuk mencapai latensi yang sangat rendah. Dalam produk lain yang fokus pada dukungan aplikasi web, latensi kurang dari 40 milidetik baik karena lebih cepat dari yang Anda lihat; Misalnya, laju bingkai bioskop adalah 24 Hz, atau sekitar 40 ms.
Chronicle Queue bertujuan untuk mencapai latensi di bawah 40 mikrodetik untuk 99% hingga 99,99% dari waktu. Menggunakan Chronicle Queue tanpa replikasi, kami mendukung aplikasi dengan latensi di bawah 40 mikrodetik dari ujung ke ujung di berbagai layanan. Seringkali latensi 99% dari antrian Chronicle sepenuhnya tergantung pada pilihan sistem operasi dan sub-sistem hard disk.
Replikasi untuk Chronicle Queue mendukung perusahaan kawat Chronicle. Ini mendukung kompresi real-time yang menghitung delta untuk masing-masing objek, seperti yang ditulis. Ini dapat mengurangi ukuran pesan dengan faktor 10, atau lebih baik, tanpa perlu batching; yaitu, tanpa memperkenalkan latensi yang signifikan.
Chronicle Queue juga mendukung kompresi LZW, Snappy, dan GZIP. Namun format ini menambah latensi yang signifikan. Ini hanya berguna jika Anda memiliki batasan ketat pada bandwidth jaringan.
Chronicle Queue mendukung sejumlah semantik:
Setiap pesan diputar ulang saat restart.
Hanya pesan baru yang dimainkan saat restart.
Restart dari titik yang diketahui menggunakan indeks entri.
Replay hanya pesan yang Anda lewatkan. Ini didukung secara langsung menggunakan pembangun MethodReader/MethodWriter.
Pada sebagian besar System.nanoTime()
sistem. Ini sama di seluruh JVM pada mesin yang sama, tetapi sangat berbeda di antara mesin. Perbedaan absolut ketika datang ke mesin tidak ada artinya. Namun, informasi tersebut dapat digunakan untuk mendeteksi outlier; Anda tidak dapat menentukan apa latensi terbaik, tetapi Anda dapat menentukan seberapa jauh latensi terbaik Anda. Ini berguna jika Anda fokus pada latensi persentil ke -99. Kami memiliki kelas yang disebut RunningMinimum
untuk mendapatkan waktu dari mesin yang berbeda, sambil mengkompensasi penyimpangan dalam nanoTime
di antara mesin. Semakin sering Anda melakukan pengukuran, semakin akurat minimum berjalan ini.
Chronicle Antrian mengelola penyimpanan berdasarkan siklus. Anda dapat menambahkan StoreFileListener
yang akan memberi tahu Anda saat file ditambahkan, dan ketika tidak lagi dipertahankan. Anda dapat memindahkan, mengompres, atau menghapus semua pesan selama sehari, sekaligus. Catatan: Sayangnya pada Windows, jika operasi IO terganggu, ia dapat menutup filechannel yang mendasarinya.
Karena alasan kinerja, kami telah menghapus pemeriksaan interupsi dalam kode antrian Chronicle. Karena itu, kami menyarankan agar Anda menghindari penggunaan antrian Chronicle dengan kode yang menghasilkan interupsi. Jika Anda tidak dapat menghindari menghasilkan interupsi maka kami sarankan Anda membuat instance terpisah dari antrian Chronicle per utas.
Chronicle Queue paling sering digunakan untuk sistem produsen-sentris di mana Anda perlu menyimpan banyak data selama berhari-hari atau bertahun-tahun. Untuk statistik, lihat Penggunaan Chronicle-Queue
Penting | Chronicle Queue tidak mendukung pengoperasian sistem file jaringan apa pun, baik itu NFS, AFS, penyimpanan berbasis SAN atau apa pun. Alasannya adalah sistem file tersebut tidak menyediakan semua primitif yang diperlukan untuk file yang dipetakan memori Chronicle Antrian. Jika ada jaringan yang diperlukan (misalnya untuk membuat data dapat diakses ke beberapa host), satu -satunya cara yang didukung adalah Replikasi Antrian Chronicle (Fitur Perusahaan). |
Sebagian besar sistem pesan berpusat pada konsumen. Kontrol aliran diimplementasikan untuk menghindari konsumen yang kelebihan beban; bahkan sebentar. Contoh umum adalah server yang mendukung banyak pengguna GUI. Pengguna -pengguna tersebut mungkin berada di mesin yang berbeda (OS dan perangkat keras), kualitas jaringan yang berbeda (latensi dan bandwidth), melakukan berbagai hal lain pada waktu yang berbeda. Untuk alasan ini masuk akal bagi konsumen klien untuk memberi tahu produsen kapan harus mundur, menunda data apa pun sampai konsumen siap untuk mengambil lebih banyak data.
Chronicle Queue adalah solusi produsen-sentris dan melakukan segala yang mungkin untuk tidak pernah mendorong kembali pada produser, atau mengatakannya untuk memperlambat. Ini membuatnya menjadi alat yang ampuh, memberikan buffer besar antara sistem Anda, dan produsen hulu di mana Anda memiliki sedikit, atau tidak, kontrol.
Penerbit data pasar tidak memberi Anda opsi untuk mendorong kembali produser lama; Jika sama sekali. Beberapa pengguna kami mengkonsumsi data dari CME OPRA. Ini menghasilkan puncak 10 juta acara per menit, dikirim sebagai paket UDP tanpa coba lagi. Jika Anda ketinggalan, atau menjatuhkan paket, maka itu hilang. Anda harus mengkonsumsi dan merekam paket -paket itu secepat mereka datang kepada Anda, dengan sedikit buffering di adaptor jaringan. Untuk data pasar khususnya, berarti waktu nyata dalam beberapa mikrodetik ; Itu tidak berarti intra-hari (siang hari).
Antrian Chronicle cepat dan efisien, dan telah digunakan untuk meningkatkan kecepatan data dilewatkan di antara utas. Selain itu, itu juga menyimpan catatan setiap pesan yang disahkan yang memungkinkan Anda untuk secara signifikan mengurangi jumlah penebangan yang perlu Anda lakukan.
Sistem kepatuhan diperlukan oleh semakin banyak sistem hari ini. Setiap orang harus memilikinya, tetapi tidak ada yang ingin diperlambat oleh mereka. Dengan menggunakan Chronicle Queue untuk buffer data antara sistem yang dipantau dan sistem kepatuhan, Anda tidak perlu khawatir tentang dampak perekaman kepatuhan untuk sistem yang dipantau. Sekali lagi, Chronicle Queue dapat mendukung jutaan peristiwa per detik, per-server, dan data akses yang telah disimpan selama bertahun-tahun.
Chronicle Queue mendukung IPC latensi rendah (komunikasi antar proses) antara JVM pada mesin yang sama dalam urutan besarnya 1 mikrodetik; serta antara mesin dengan latensi khas 10 mikrodetik untuk throughput sederhana beberapa ratus ribu. Chronicle Queue mendukung throughput jutaan peristiwa per detik, dengan latensi mikrodetik yang stabil.
Lihat artikel tentang penggunaan antrian kronik dalam layanan microser
Antrian kronik dapat digunakan untuk membangun mesin negara. Semua informasi tentang keadaan komponen tersebut dapat direproduksi secara eksternal, tanpa akses langsung ke komponen, atau ke keadaan mereka. Ini secara signifikan mengurangi kebutuhan untuk penebangan tambahan. Namun, penebangan apa pun yang Anda butuhkan dapat direkam dengan sangat rinci. Ini membuat pembuatan DEBUG
yang memungkinkan dalam produksi praktis. Ini karena biaya pencatatan sangat rendah; Kurang dari 10 mikrodetik. Log dapat direplikasi secara terpusat untuk konsolidasi log. Chronicle Queue digunakan untuk menyimpan 100+ TB data, yang dapat diputar ulang dari titik waktu mana pun.
Komponen streaming non-batching sangat berkinerja, deterministik, dan dapat direproduksi. Anda dapat mereproduksi bug yang hanya muncul setelah sejuta acara yang dimainkan dalam urutan tertentu, dengan waktu yang dipercepat realistis. Ini membuat menggunakan pemrosesan aliran menarik untuk sistem yang membutuhkan tingkat kualitas yang tinggi.
Rilis tersedia di Maven Central sebagai:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
Lihat Catatan Rilis Antrian Chronicle dan dapatkan nomor versi terbaru. Snapshot tersedia di https://oss.sonatype.org
Catatan | Kelas-kelas yang berada di salah satu paket 'internal', 'IMP ' , dan 'utama' (yang terakhir berisi berbagai metode utama yang dapat dijalan waktu dengan alasan apa pun . Lihat masing-masing file package-info.java untuk detailnya. |
Dalam Chronicle Antrian V5 Tailer sekarang hanya dibaca, dalam Chronicle Antreue V4 kami memiliki konsep pengindeksan malas, di mana appenders tidak akan menulis indeks tetapi sebaliknya pengindeksan dapat dilakukan oleh penjahit. Kami memutuskan untuk menjatuhkan pengindeksan malas di V5; Membuat Tailers hanya baca tidak hanya menyederhanakan antrian kronik tetapi juga memungkinkan kita untuk menambahkan optimisasi di tempat lain dalam kode.
Model penguncian dari antrian Chronicle diubah di V5, di Chronicle Antrian V4 Kunci Tulis (untuk mencegah penulisan bersamaan dengan antrian) ada dalam file .cq4. Di V5 ini dipindahkan ke satu file yang disebut toko meja (metadata.cq4t). Ini menyederhanakan kode penguncian secara internal karena hanya file Table Store yang harus diperiksa.
Anda dapat menggunakan Chronicle Queue V5 untuk membaca pesan yang ditulis dengan Chronicle Queue V4, tetapi ini tidak dijamin selalu berfungsi - jika, misalnya, Anda membuat antrian V4 Anda dengan wireType(WireType.FIELDLESS_BINARY)
maka kronik antrian V5 tidak akan dapat ke Baca header antrian. Kami memiliki beberapa tes untuk pembacaan V5 antrian V4 tetapi ini terbatas dan semua skenario mungkin tidak didukung.
Anda tidak dapat menggunakan Chronicle Queue V5 untuk menulis untuk mencatat antrian antrian V4.
Chronicle Queue V4 adalah penulisan ulang antrian kronik lengkap yang memecahkan masalah-masalah berikut yang ada di V3.
Tanpa pesan yang menggambarkan diri sendiri, pengguna harus membuat fungsionalitas mereka sendiri untuk membuang pesan dan penyimpanan data jangka panjang. Dengan V4 Anda tidak perlu melakukan ini, tetapi Anda bisa jika Anda mau.
Antrian Vanilla Chronicle akan membuat file per utas. Ini baik -baik saja jika jumlah utas dikontrol, namun, banyak aplikasi memiliki sedikit atau tidak ada kontrol atas berapa banyak utas yang digunakan dan ini menyebabkan masalah kegunaan.
Konfigurasi untuk Indeks dan Vanilla Chronicle sepenuhnya dalam kode sehingga pembaca harus memiliki konfigurasi yang sama dengan penulis dan tidak selalu jelas apa itu.
Tidak ada cara bagi produser untuk mengetahui berapa banyak data yang telah direplikasi ke mesin kedua. Satu -satunya solusi adalah mereplikasi data kembali ke produsen.
Anda perlu menentukan ukuran data untuk disediakan sebelum Anda mulai menulis pesan Anda.
Anda perlu melakukan penguncian sendiri untuk appender saat menggunakan Chronicle yang diindeks.
Di Chronicle Antrian V3, semuanya dalam hal byte, bukan kawat. Ada dua cara untuk menggunakan byte di Chronicle Antrian V4. Anda dapat menggunakan metode writeBytes
dan readBytes
, atau Anda bisa mendapatkan bytes()
dari kawat. Misalnya:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle Queue Enterprise Edition adalah versi yang didukung secara komersial dari antrian Chronicle Open Source kami yang sukses. Dokumentasi open source diperluas oleh dokumen -dokumen berikut untuk menggambarkan fitur tambahan yang tersedia saat Anda dilisensikan untuk edisi perusahaan. Ini adalah:
Enkripsi antrian pesan dan pesan. Untuk informasi lebih lanjut, lihat dokumentasi enkripsi.
Replikasi TCP/IP (dan opsional UDP) antara host untuk memastikan cadangan waktu nyata dari semua data antrian Anda. Untuk informasi lebih lanjut, lihat dokumentasi replikasi, protokol replikasi antrian dibahas dalam protokol replikasi.
Dukungan zona waktu untuk penjadwalan rollover antrian harian. Untuk informasi lebih lanjut, lihat Dukungan TimeZone.
Dukungan mode async untuk memberikan kinerja yang lebih baik pada throughput tinggi pada sistem file yang lebih lambat. Untuk informasi lebih lanjut, lihat mode async dan juga kinerja.
Pra-Toucher untuk outlier yang lebih baik, lihat pra-toucher dan konfigurasinya
Selain itu, Anda akan sepenuhnya didukung oleh para ahli teknis kami.
Untuk informasi lebih lanjut tentang Edisi Perusahaan Antrian Chronicle, silakan hubungi [email protected].
Antrian Chronicle didefinisikan oleh SingleChronicleQueue.class
yang dirancang untuk mendukung:
menggulung file setiap hari, mingguan atau setiap jam,
penulis bersamaan pada mesin yang sama,
Pembaca bersamaan pada mesin yang sama atau di beberapa mesin melalui replikasi TCP (dengan Chronicle Queue Enterprise),
Pembaca dan penulis bersamaan antara Docker atau beban kerja yang dimasukkan lainnya
Serialisasi dan deserialisasi nol salinan,
Jutaan menulis/membaca per detik pada perangkat keras komoditas.
Sekitar 5 juta pesan/detik untuk pesan 96-byte pada prosesor i7-4790. Struktur direktori antrian adalah sebagai berikut:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
Formatnya terdiri dari byte yang disesuaikan dengan ukuran yang diformat menggunakan BinaryWire
atau TextWire
. Chronicle Queue dirancang untuk didorong dari kode. Anda dapat dengan mudah menambahkan antarmuka yang sesuai dengan kebutuhan Anda.
Catatan | Karena operasi tingkat rendah yang cukup rendah, Operasi Baca/Tulis Antrian Chronicle dapat melemparkan pengecualian yang tidak terkendali. Untuk mencegah kematian utas, mungkin praktis untuk menangkap RuntimeExceptions dan log/menganalisisnya sebagaimana mestinya. |
Catatan | Untuk demonstrasi bagaimana kronik antrian dapat digunakan lihat demo antrian kronik dan untuk dokumentasi java lihat kronik antrian javadocs |
Pada bagian berikut, pertama kami memperkenalkan beberapa terminologi dan referensi cepat untuk menggunakan antrian Chronicle. Kemudian, kami memberikan panduan yang lebih rinci.
Chronicle Queue adalah jurnal pesan yang bertahan yang mendukung penulis dan pembaca bersamaan bahkan di beberapa JVM pada mesin yang sama. Setiap pembaca melihat setiap pesan, dan pembaca dapat bergabung kapan saja dan masih melihat setiap pesan.
Catatan | Kami sengaja menghindari istilah konsumen dan sebaliknya menggunakan pembaca karena pesan tidak dikonsumsi/dihancurkan dengan membaca. |
Antrian Chronicle memiliki konsep utama berikut:
Kutipan
Kutipan adalah wadah data utama dalam antrian kronik. Dengan kata lain, setiap antrian kronik terdiri dari kutipan. Menulis pesan ke antrian Chronicle berarti memulai kutipan baru, menulis pesan ke dalamnya, dan menyelesaikan kutipan di akhir.
Appender
Appender adalah sumber pesan; sesuatu seperti iterator di lingkungan Chronicle. Anda menambahkan data yang menambahkan antrian Chronicle saat ini. Ini dapat melakukan penulisan berurutan dengan menambahkan ke akhir antrian saja. Tidak ada cara untuk memasukkan, atau menghapus kutipan.
Penjahit
Tailer adalah pembaca kutipan yang dioptimalkan untuk bacaan berurutan. Ini dapat melakukan bacaan berurutan dan acak, baik ke depan maupun ke belakang. Tailers membaca pesan yang tersedia berikutnya setiap kali dipanggil. Berikut dijamin dalam antrian Chronicle:
Untuk setiap appender , pesan ditulis dalam urutan yang ditulis appender. Pesan oleh appenders yang berbeda diselingi,
Untuk setiap penjahit , itu akan melihat setiap pesan untuk suatu topik dalam urutan yang sama dengan setiap penjahit lainnya,
Saat direplikasi, setiap replika memiliki salinan setiap pesan.
Chronicle Queue kurang broker. Jika Anda membutuhkan arsitektur dengan broker, silakan hubungi [email protected].
File bergulir dan antrian file
Chronicle Queue dirancang untuk menggulung file -nya tergantung pada siklus roll yang dipilih saat antrian dibuat (lihat rollcycles). Dengan kata lain, file antrian dibuat untuk setiap siklus roll yang memiliki ekstensi cq4
. Ketika siklus roll mencapai titik itu harus digulung, Appender akan secara atom menulis tanda EOF
di akhir file saat ini untuk menunjukkan bahwa tidak ada appender lain yang harus menulis ke file ini dan tidak ada penipu yang harus membaca lebih lanjut, dan sebaliknya semua orang harus menggunakan file baru.
Jika prosesnya ditutup, dan restart nanti ketika siklus roll harus menggunakan file baru, appender akan mencoba menemukan file lama dan menulis tanda EOF
di dalamnya untuk membantu pelaku pembacaannya.
Topik
Setiap topik adalah direktori file antrian. Jika Anda memiliki topik yang disebut mytopic
, tata letaknya bisa terlihat seperti ini:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
Untuk menyalin semua data selama satu hari (atau siklus), Anda dapat menyalin file untuk hari itu ke mesin pengembangan Anda untuk pengujian replay.
Pembatasan topik dan pesan
Topik terbatas untuk menjadi string yang dapat digunakan sebagai nama direktori. Dalam suatu topik, Anda dapat memiliki sub-topik yang dapat berupa tipe data apa pun yang dapat diserialisasi. Pesan dapat berupa data serial apa pun.
Dukungan Antrian Chronicle:
Objek Serializable
, meskipun ini harus dihindari karena tidak efisien
Objek Externalizable
lebih disukai jika Anda ingin menggunakan Java API standar.
byte[]
dan String
Marshallable
; Pesan yang menggambarkan diri yang dapat ditulis sebagai YAML, Binary Yaml, atau JSON.
BytesMarshallable
yang merupakan biner tingkat rendah, atau pengkodean teks.
Bagian ini memberikan referensi cepat untuk menggunakan antrian Chronicle untuk secara singkat menunjukkan cara membuat, menulis/membaca ke dalam/dari antrian.
Konstruksi antrian kronik
Membuat contoh antrian Chronicle berbeda dari hanya memanggil konstruktor. Untuk membuat instance, Anda harus menggunakan ChronicleQueueBuilder
.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
Dalam contoh ini kami telah membuat IndexedChronicle
yang menciptakan dua RandomAccessFiles
; satu untuk indeks, dan satu untuk data yang memiliki nama relatif:
${java.io.tmpdir}/getting-started/{today}.cq4
Menulis untuk antrian
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
Membaca dari antrian
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
Juga, metode ChronicleQueue.dump()
dapat digunakan untuk membuang konten mentah sebagai string.
queue . dump ();
Pembersihan
Chronicle Queue menyimpan datanya di luar-heap, dan disarankan agar Anda menelepon close()
setelah Anda selesai bekerja dengan CHRONICLE QUEUE, untuk membebaskan sumber daya.
Catatan | Tidak ada data yang akan hilang jika Anda melakukan ini. Ini hanya untuk membersihkan sumber daya yang digunakan. |
queue . close ();
Menyatukan semuanya
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
Anda dapat mengonfigurasi antrian Chronicle menggunakan parameter konfigurasinya atau properti sistem. Selain itu, ada berbagai cara menulis/membaca ke dalam/dari antrian seperti penggunaan proxy dan menggunakan MethodReader
dan MethodWriter
.
Chronicle Queue (CQ) dapat dikonfigurasi melalui sejumlah metode pada kelas SingleChronicleQueueBuilder
. Beberapa parameter yang paling ditanyakan oleh pelanggan kami dijelaskan di bawah ini.
Rollcycle
Parameter RollCycle
mengkonfigurasi laju di mana CQ akan menggulung file antrian yang mendasarinya. Misalnya, menggunakan cuplikan kode berikut akan menghasilkan file antrian yang digulirkan (yaitu file baru yang dibuat) setiap jam:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
Setelah siklus roll antrian telah ditetapkan, itu tidak dapat diubah di kemudian hari. Setiap contoh lebih lanjut dari SingleChronicleQueue
yang dikonfigurasi untuk menggunakan jalur yang sama harus dikonfigurasi untuk menggunakan siklus roll yang sama, dan jika tidak, maka siklus roll akan diperbarui agar sesuai dengan siklus roll yang ada. Dalam hal ini, pesan log peringatan akan dicetak untuk memberi tahu pengguna perpustakaan tentang situasi:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
Output Konsol:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
Jumlah maksimum pesan yang dapat disimpan dalam file antrian tergantung pada siklus roll. Lihat FAQ untuk informasi lebih lanjut tentang ini.
Dalam antrian Chronicle, waktu rollover didasarkan pada UTC. Fitur TimeZone Rollover Enterprise memperluas kemampuan antrian Chronicle untuk menentukan waktu dan periodisitas rollover antrian, bukan UTC. Untuk informasi lebih lanjut, lihat rollover antrian zona waktu.
Kelas Chronicle Queue FileUtil
menyediakan metode yang berguna untuk mengelola file antrian. Lihat Mengelola File Roll secara langsung.
Samarupandang
Dimungkinkan untuk mengonfigurasi bagaimana Chronicle Queue akan menyimpan data dengan secara eksplisit mengatur WireType
:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
Misalnya:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
Meskipun dimungkinkan untuk secara eksplisit memberikan wirasype saat membuat pembangun, itu tidak disarankan karena tidak semua jenis kawat didukung oleh antrian Chronicle. Secara khusus, jenis kawat berikut tidak didukung:
Teks (dan pada dasarnya semuanya berdasarkan teks, termasuk JSON dan CSV)
MENTAH
Read_any
Blocksize
Ketika antrian dibaca/ditulis, bagian dari file yang saat ini sedang dibaca/ditulis dipetakan ke segmen memori. Parameter ini mengontrol ukuran blok pemetaan memori. Anda dapat mengubah parameter ini menggunakan metode SingleChronicleQueueBuilder.blockSize(long blockSize)
jika diperlukan.
Catatan | Anda harus menghindari mengubah blockSize secara tidak perlu. |
Jika Anda mengirim pesan besar maka Anda harus mengatur blockSize
besar yaitu blockSize
harus setidaknya empat kali ukuran pesan.
Peringatan | Jika Anda menggunakan blockSize kecil untuk pesan besar, Anda menerima IllegalStateException dan penulisannya dibatalkan. |
Kami menyarankan Anda menggunakan blockSize
yang sama untuk setiap instance antrian saat mereplikasi antrian, blockSize
tidak ditulis ke metadata antrian, jadi idealnya harus diatur ke nilai yang sama saat membuat contoh antrian kronik (ini direkomendasikan tetapi jika Anda mau untuk berjalan dengan blocksize
yang berbeda yang Anda bisa).
Tip | Gunakan blockSize yang sama untuk setiap contoh antrian yang direplikasi. |
indexspacing
Parameter ini menunjukkan ruang antara kutipan yang secara eksplisit diindeks. Angka yang lebih tinggi berarti kinerja penulisan berurutan yang lebih tinggi tetapi akses acak yang lebih lambat dibaca. Kinerja baca berurutan tidak terpengaruh oleh properti ini. Misalnya, jarak indeks default berikut dapat dikembalikan:
16 (teliti)
64 (setiap hari)
Anda dapat mengubah parameter ini menggunakan metode SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
.
IndexCount
Ukuran setiap array indeks, serta jumlah total array indeks per file antrian.
Catatan | IndexCount 2 adalah jumlah maksimum entri antrian yang diindeks. |
Catatan | Lihat Bagian Pengindeksan Kutipan dalam Antrian Chronicle dari Panduan Pengguna ini untuk informasi lebih lanjut dan contoh menggunakan indeks. |
readbufferMode, writeBufferMode
Parameter ini mendefinisikan Buffermode untuk membaca atau menulis yang memiliki opsi berikut:
None
- default (dan satu -satunya yang tersedia untuk pengguna sumber terbuka), tidak ada buffering;
Copy
- Digunakan bersama dengan enkripsi;
Asynchronous
- Gunakan buffer asinkron saat membaca dan/atau menulis, disediakan oleh Chronicle Async Mode.
Buffercapacity
Kapasitas RingBuffer dalam byte saat menggunakan bufferMode: Asynchronous
Dalam antrian Chronicle kami merujuk pada tindakan menulis data Anda ke The Chronicle Queue, sebagai menyimpan kutipan. Data ini dapat dibuat dari jenis data apa pun, termasuk teks, angka, atau gumpalan serial. Pada akhirnya, semua data Anda, terlepas dari apa itu, disimpan sebagai serangkaian byte.
Tepat sebelum menyimpan kutipan Anda, Chronicle Antrian Cadangan header 4-byte. Chronicle Queue menulis panjang data Anda ke header ini. Dengan cara ini, ketika kronik antrian datang untuk membaca kutipan Anda, ia tahu berapa lama setiap gumpalan data. Kami merujuk pada header 4-byte ini, bersama dengan kutipan Anda, sebagai dokumen. Antrian Chronicle yang berbicara secara ketat dapat digunakan untuk membaca dan menulis dokumen.
Catatan | Di dalam header 4-byte ini, kami juga memesan beberapa bit untuk sejumlah operasi internal, seperti penguncian, untuk membuat kronik-antrian-aman-aman di kedua prosesor dan utas. Yang penting untuk dicatat adalah bahwa karena ini, Anda tidak dapat secara ketat mengubah 4 byte menjadi bilangan bulat untuk menemukan panjang gumpalan data Anda. |
Seperti yang dinyatakan sebelumnya, Chronicle Queue menggunakan appender untuk menulis ke antrian dan penjahit untuk dibaca dari antrian. Tidak seperti solusi antrian Java lainnya, pesan tidak hilang ketika mereka dibaca dengan penjahit. Ini dibahas secara lebih rinci di bagian di bawah ini pada "Membaca dari antrian menggunakan penjahit". Untuk menulis data ke antrian Chronicle, Anda harus terlebih dahulu membuat appender:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
Chronicle Queue menggunakan antarmuka tingkat rendah berikut untuk menulis data:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
Tutup pada percobaan-dengan-sumber daya, adalah titik ketika panjang data ditulis ke header. Anda juga dapat menggunakan DocumentContext
untuk mengetahui indeks bahwa data Anda baru saja ditetapkan (lihat di bawah). Anda kemudian dapat menggunakan indeks ini untuk pindah ke/mencari kutipan ini. Setiap kutipan kronik antrian memiliki indeks yang unik.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
Metode tingkat tinggi di bawah ini seperti writeText()
adalah metode kenyamanan untuk memanggil appender.writingDocument()
, tetapi kedua pendekatan pada dasarnya melakukan hal yang sama. Kode writeText(CharSequence text)
terlihat seperti ini:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
Jadi, Anda memiliki pilihan sejumlah antarmuka tingkat tinggi, turun ke API tingkat rendah, untuk memori mentah.
Ini adalah API tingkat tertinggi yang menyembunyikan fakta bahwa Anda menulis untuk pesan sama sekali. Manfaatnya adalah Anda dapat bertukar panggilan ke antarmuka dengan komponen nyata, atau antarmuka ke protokol yang berbeda.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
Anda dapat menulis "pesan yang menggambarkan diri sendiri". Pesan semacam itu dapat mendukung perubahan skema. Mereka juga lebih mudah dipahami saat men -debug atau mendiagnosis masalah.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
Anda dapat menulis "data mentah" yang menggambarkan diri sendiri. Jenisnya akan selalu benar; Posisi adalah satu -satunya indikasi tentang arti nilai -nilai tersebut.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
Anda dapat menulis "data mentah" yang tidak menggambarkan diri sendiri. Pembaca Anda harus tahu apa arti data ini, dan jenis yang digunakan.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
Di bawah ini, cara level terendah untuk menulis data diilustrasikan. Anda mendapatkan alamat untuk memori mentah dan Anda dapat menulis apa pun yang Anda inginkan.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
Anda dapat mencetak isi antrian. Anda dapat melihat dua yang pertama, dan dua pesan terakhir menyimpan data yang sama.
// dump the content of the queue
System . out . println ( queue . dump ());
Cetakan:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
Membaca antrian mengikuti pola yang sama seperti menulis, kecuali ada kemungkinan tidak ada pesan ketika Anda mencoba membacanya.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
Anda dapat mengubah setiap pesan menjadi panggilan metode berdasarkan konten pesan, dan memiliki kronik antrian secara otomatis deserialisasi argumen metode. Memanggil reader.readOne()
akan secara otomatis melewatkan (memfilter) pesan apa pun yang tidak cocok dengan pembaca metode Anda.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
Anda dapat memecahkan kode pesan sendiri.
Catatan | Nama, jenis, dan urutan bidang tidak harus cocok. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
Anda dapat membaca nilai data yang menggambarkan diri sendiri. Ini akan memeriksa jenisnya benar, dan mengonversi sesuai kebutuhan.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
Anda dapat membaca data mentah sebagai primitif dan string.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
Atau, Anda bisa mendapatkan alamat memori yang mendasarinya dan mengakses memori asli.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
Catatan | Setiap penipu melihat setiap pesan. |
Abstraksi dapat ditambahkan untuk memfilter pesan, atau menetapkan pesan hanya untuk satu prosesor pesan. Namun, secara umum Anda hanya perlu satu penerima utama untuk suatu topik, dengan kemungkinan, beberapa penerima pendukung untuk pemantauan dll.
Karena Chronicle Queue tidak mempartisi topiknya, Anda mendapatkan pemesanan total semua pesan dalam topik itu. Di seluruh topik, tidak ada jaminan pemesanan; Jika Anda ingin memutar ulang deterministik dari sistem yang mengkonsumsi dari banyak topik, kami sarankan untuk memutar ulang dari output sistem itu.
Chronicle Queue tailer dapat membuat penangan file, penangan file dibersihkan setiap kali metode close()
yang terkait dipanggil atau setiap kali JVM menjalankan koleksi sampah. Jika Anda menulis kode Anda tidak memiliki jeda GC dan Anda secara eksplisit ingin membersihkan file penangan, Anda dapat menghubungi yang berikut:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
Dalam beberapa aplikasi, mungkin perlu untuk mulai membaca dari akhir antrian (misalnya dalam skenario restart). Untuk kasus penggunaan ini, ExcerptTailer
menyediakan metode toEnd()
. Ketika arah penjahit FORWARD
(secara default, atau sebagaimana ditetapkan oleh ExcerptTailer.direction
Method), kemudian memanggil toEnd()
akan menempatkan Tailer tepat setelah catatan terakhir yang ada dalam antrian. Dalam hal ini, penjahit sekarang siap untuk membaca catatan baru yang ditambahkan ke antrian. Sampai setiap pesan baru ditambahkan ke antrian, tidak akan ada DocumentContext
baru yang tersedia untuk dibaca:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
Jika perlu untuk membaca mundur melalui antrian dari akhir, maka penjahit dapat diatur untuk membaca mundur:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
Saat membaca ke belakang, maka metode toEnd()
akan memindahkan tailer ke catatan terakhir dalam antrian. Jika antrian tidak kosong, maka akan ada DocumentContext
yang tersedia untuk dibaca:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
Alias bernama tailer.
Ini dapat bermanfaat untuk memiliki penjahit yang berlanjut dari tempat itu untuk memulai kembali aplikasi.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "a" pesan terakhir membaca 2
Tailer "a" selanjutnya membaca pesan 3
Tailer "b" terakhir membaca pesan 0
Tailer "b" next reads message 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
Catatan | The direction() is not preserved across restarts, only the next index to be read. |
Catatan | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
Tip | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
Catatan | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; nama string pribadi; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
atau
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Catatan | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
Peringatan | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
Catatan | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
Catatan | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
Catatan | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
Hasil:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -------------------------------------------------- -------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -------------------------------------------------- -------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -------------------------------------------------- -------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -------------------------------------------------- -------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
Hasil:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.