https://player.vimeo.com/video/201989439
Die Chreue-Warteschlange ist ein anhaltendes Rahmen für Hochleistungsanwendungen mit niedrigem Latenz.
Dieses Projekt deckt die Java -Version der Chreuik -Warteschlange ab. Eine C ++ - Version dieses Projekts ist ebenfalls verfügbar und unterstützt Java/C ++ - Interoperabilität sowie zusätzliche Sprachbindungen, z. B. Python. Wenn Sie an der Bewertung der C ++ - Version interessiert sind, wenden Sie sich bitte an [email protected].
At first glance Chronicle Queue can be seen as simply another queue implementation . Es hat jedoch wichtige Designentscheidungen, die hervorgehoben werden sollten. Die Chreue- Warteschlange von Chronicle bietet eine Umgebung, in der Anwendungen nicht an der Müllsammlung (GC) leiden. Bei der Implementierung von Hochleistungs- und Speicher-intensiven Anwendungen (Sie haben den ausgefallenen Begriff "BigData" in Java gehört, ist eines der größten Probleme die Müllsammlung.
Mit der Chreue-Warteschlange können Nachrichten am Ende einer Warteschlange hinzugefügt werden ("angehängt"), aus der Warteschlange ("Tailed") gelesen und auch die Suche nach Zufallszugriff unterstützt.
Sie können sich als Chreue der Chronik in Betracht ziehen, der einem dauerhaften/persistierten Thema mit niedriger Latenz-Broker ähnlich ist, das Nachrichten mit verschiedenen Typen und Größen enthalten kann. Die Chreue -Warteschlange ist eine verteilte unbegrenzte Warteschlange, die:
Unterstützt asynchrone RMI und veröffentlichen/abonnieren Schnittstellen mit Mikrosekundenlatenzen.
Übergibt Nachrichten zwischen JVMs in unter einer Mikrosekunde
Übergibt Nachrichten zwischen JVMS auf verschiedenen Maschinen per Replikation in unter 10 Mikrosekunden (Unternehmensfunktion)
Bietet stabile, weiche Echtzeit-Latenzen in die Millionen von Nachrichten pro Sekunde für einen einzelnen Thread zu einer Warteschlange; mit Gesamtreihenfolge jedes Ereignisses.
Bei der Veröffentlichung von 40-Byte-Nachrichten erreichen wir einen hohen Prozentsatz der Zeit unter 1 Mikrosekunde. Die 99. Perzentillatenz ist die schlechteste 1 zu 100, und das 99,9. Perzentil ist die schlechteste 1 zu 1000 Latenz.
Chargengröße | 10 Millionen Ereignisse pro Minute | 60 Millionen Ereignisse pro Minute | 100 Millionen Ereignisse pro Minute |
---|---|---|---|
99%ile | 0,78 µs | 0,78 µs | 1,2 µs |
99,9%ile | 1,2 µs | 1,3 µs | 1,5 µs |
Chargengröße | 10 Millionen Ereignisse pro Minute | 60 Millionen Ereignisse pro Minute | 100 Millionen Ereignisse pro Minute |
---|---|---|---|
99%ile | 20 µs | 28 µs | 176 µs |
99,9%ile | 901 µs | 705 µs | 5,370 µs |
Notiz | 100 Millionen Events pro Minute schicken alle 660 Nanosekunden ein Ereignis. repliziert und bestehen bleiben. |
Wichtig | Diese Leistung wird nicht mit einer großen Gruppe von Maschinen erzielt. Dadurch wird ein Thread verwendet, um zu veröffentlichen, und einen Thread zum Verbrauch. |
Die Chreue der Chronik ist so konzipiert:
Seien Sie ein "alles speichern", das mit Mikrosekunden-Echtzeitlatenz lesen kann. Dies unterstützt auch die anspruchsvollsten Hochfrequenzhandelssysteme. Es kann jedoch in jeder Anwendung verwendet werden, bei der die Aufzeichnung von Informationen ein Problem darstellt.
Unterstützen Sie die zuverlässige Replikation mit Benachrichtigung entweder an den Appender (Autor of Message) oder einen Tailer (Leser der Nachricht), wenn eine Nachricht erfolgreich repliziert wurde.
Die Chreue -Warteschlange der Chroniknot übernimmt den Speicherplatz im Vergleich zum Speicher billig. Die Chreue -Warteschlange nutzt den Speicherplatz, den Sie haben, voll und ganz durch den Hauptspeicher Ihrer Maschine. Wenn Sie Spinning HDD verwenden, können Sie viele TBS Disk -Raum für kleine Kosten speichern.
Die einzige zusätzliche Software, die die Chreue -Warteschlange ausführen muss, ist das Betriebssystem. Es hat keinen Broker; Stattdessen verwendet es Ihr Betriebssystem, um die gesamte Arbeit zu erledigen. Wenn Ihre Anwendung stirbt, läuft das Betriebssystem weiterhin länger, sodass keine Daten verloren gehen. Auch ohne Replikation.
Da die Chreue-Warteschlange in der Chronik alle gespeicherten Daten in Speicher-Mapping-Dateien speichert, hat dies einen trivialen Overhead auf der Heap, auch wenn Sie über 100 TB Daten haben.
Chronik hat erhebliche Anstrengungen unternommen, um eine sehr geringe Latenz zu erreichen. In anderen Produkten, die sich auf die Unterstützung von Webanwendungen konzentrieren, sind Latenzen von weniger als 40 Millisekunden in Ordnung, da sie schneller sind, als Sie sehen können. Beispielsweise beträgt die Bildrate des Kinos 24 Hz oder etwa 40 ms.
Die Chreue -Warteschlange zielt darauf ab, eine Latenzen von unter 40 Mikrosekunden für 99% bis 99,99% der Fälle zu erreichen. Mithilfe der Chreue-Warteschlange ohne Replikation unterstützen wir Anwendungen mit Latenzen unter 40 Mikrosekunden, die von der End-to-End in mehreren Diensten übereinstimmen. Oft hängt die 99% ige Latenz der Chronikwarteschlange ab, die von der Auswahl des Betriebssystems und des Festplatten-Subsystems abhängig ist.
Die Replikation für die Chreue der Chronik unterstützt Chronicle Draht Enterprise. Dies unterstützt eine Echtzeitkomprimierung, die die Deltas für einzelne Objekte berechnet, wie sie geschrieben werden. Dies kann die Größe der Nachrichten um den Faktor 10 oder besser verringern, ohne dass sie angeordnet werden müssen. das heißt, ohne eine erhebliche Latenz einzuführen.
Die Chreue der Chronik unterstützt auch die Komprimierung von LZW, Snappy und GZIP. Diese Formate fügen jedoch eine erhebliche Latenz hinzu. Diese sind nur nützlich, wenn Sie strenge Einschränkungen bei der Netzwerkbandbreite haben.
Die Chreue der Chronik unterstützt eine Reihe von Semantik:
Jede Nachricht wird beim Neustart wiedergegeben.
Im Neustart werden nur neue Nachrichten gespielt.
Starten Sie mit dem Index des Eintrags von jedem bekannten Punkt neu.
Wiederholen Sie nur die Nachrichten, die Sie verpasst haben. Dies wird direkt unter Verwendung der MethodReader/Methodwriter -Bauherren unterstützt.
Auf den meisten Systemen System.nanoTime()
ist ungefähr die Anzahl der Nanosekunden, seit das System das letzte Mal neu gestartet wurde (obwohl sich verschiedene JVMs unterschiedlich verhalten können). Dies gilt für JVMs auf derselben Maschine, aber zwischen den Maschinen. Der absolute Unterschied bei Maschinen ist bedeutungslos. Die Informationen können jedoch verwendet werden, um Ausreißer zu erkennen. Sie können nicht bestimmen, was die beste Latenz ist, aber Sie können feststellen, wie weit Sie die besten Latenzen entfernt sind. Dies ist nützlich, wenn Sie sich auf die 99. Perzentillatenzen konzentrieren. Wir haben eine Klasse namens RunningMinimum
, um Zeiten von verschiedenen Maschinen zu erhalten und gleichzeitig eine Drift in der nanoTime
zwischen Maschinen auszugleichen. Je öfter Sie Messungen durchführen, desto genauer ist dieses laufende Minimum.
Die Chreue -Warteschlange verwaltet den Speicher nach Zyklus. Sie können einen StoreFileListener
hinzufügen, der Sie benachrichtigt, wenn eine Datei hinzugefügt wird und wenn sie nicht mehr zurückgehalten wird. Sie können alle Nachrichten für einen Tag gleichzeitig verschieben, komprimieren oder löschen. Hinweis: Wenn ein IO -Betrieb unter Windows unterbrochen wird, kann dies leider das zugrunde liegende Filechannel schließen.
Aus Leistungsgründen haben wir die Überprüfung nach Interrupts im Chreue -Code von Chronicle entfernt. Aus diesem Grund empfehlen wir Ihnen, die Chreuik -Warteschlange mit Code zu vermeiden, die Interrupts generiert. Wenn Sie keine Interrupts vermeiden können, empfehlen wir, eine separate Instanz der Chreuikwarteschlange pro Thread zu erstellen.
Die Chreue-Warteschlange wird am häufigsten für produziert-zentrierte Systeme verwendet, bei denen Sie für Tage oder Jahre viele Daten behalten müssen. Für Statistiken siehe Verwendung von Chronicle-Queue
Wichtig | Chreue der Chronik unterstützt kein Netzwerkdateisystem, sei es NFS, AFS, SAN-basierte Speicher oder irgendetwas anderes. Der Grund dafür ist, dass diese Dateisysteme nicht alle erforderlichen Primitiven für Speicher-Mappy-Dateien Chreus verwenden. Wenn ein Netzwerk benötigt wird (z. B. um die Daten für mehrere Hosts zugänglich zu machen), ist die einzige unterstützte Art und Weise die Replikation der Chronik -Warteschlangen (Enterprise -Funktion). |
Die meisten Messaging-Systeme sind konsumentenorientiert. Die Durchflussregelung wird implementiert, um zu vermeiden, dass der Verbraucher jemals überlastet wird. Auch für einen Moment. Ein häufiges Beispiel ist ein Server, der mehrere GUI -Benutzer unterstützt. Diese Benutzer sind möglicherweise in verschiedenen Maschinen (Betriebssystem und Hardware), unterschiedliche Qualitäten von Netzwerk (Latenz und Bandbreite), die zu unterschiedlichen Zeiten eine Vielzahl anderer Dinge durchführen. Aus diesem Grund ist es für den Kundenverbraucher sinnvoll, dem Hersteller zu sagen, wann er sich zurückziehen soll, und die Daten verzögert, bis der Verbraucher bereit ist, weitere Daten zu nehmen.
Die Chreue-Warteschlange ist eine produzentorientierte Lösung und tut alles, was möglich ist, um den Produzenten nie zurückzudrängen oder zu sagen, dass er langsamer wird. Dies macht es zu einem leistungsstarken Werkzeug und bietet einen großen Puffer zwischen Ihrem System und einem vorgelagerten Produzenten, über den Sie wenig oder nein kontrollieren.
Marktdatenverlage geben Ihnen nicht die Möglichkeit, den Produzenten lange zurückzudrängen. Wenn überhaupt. Einige unserer Benutzer verbrauchen Daten von CME OPRA. Dies erzeugt Spitzen von 10 Millionen Ereignissen pro Minute, die als UDP -Pakete ohne Wiederholung gesendet werden. Wenn Sie ein Paket verpassen oder fallen lassen, geht es verloren. Sie müssen diese Pakete so schnell konsumieren und aufnehmen, wie sie zu Ihnen kommen, mit sehr wenig Pufferung im Netzwerkadapter. Insbesondere für Marktdaten bedeutet Echtzeit in wenigen Mikrosekunden ; Es bedeutet nicht intraig (tagsüber).
Die Chreue der Chronik ist schnell und effizient und wurde verwendet, um die Geschwindigkeit zu erhöhen, dass die Daten zwischen Threads übergeben werden. Darüber hinaus wird auch eine Aufzeichnung jeder verabschiedeten Nachricht aufgenommen, damit Sie die Menge an Protokollierung, die Sie ausführen müssen, erheblich reduzieren können.
Compliance -Systeme werden heutzutage von immer mehr Systemen benötigt. Jeder muss sie haben, aber niemand will von ihnen verlangsamt werden. Durch die Verwendung von Chreue -Warteschlangen zur Pufferdaten zwischen überwachten Systemen und dem Compliance -System müssen Sie sich keine Sorgen über die Auswirkungen der Compliance -Aufzeichnung für Ihre überwachten Systeme machen. Auch hier kann die Chreue der Chronik Millionen von Ereignissen pro Sekunde, pro-server- und Zugriffsdaten unterstützen, die seit Jahren beibehalten werden.
Die Chreue -Warteschlange unterstützt IPC mit geringer Latenz (Inter -Prozesskommunikation) zwischen JVMs auf derselben Maschine in der Größenordnung von 1 Mikrosekunde; sowie zwischen Maschinen mit einer typischen Latenz von 10 Mikrosekunden für bescheidene Durchsätze von einigen hunderttausend. Die Chreue -Warteschlange der Chronik unterstützt Durchsatz von Millionen von Ereignissen pro Sekunde mit stabilen Mikrosekunden -Latenzen.
Siehe Artikel über die Verwendung der Chreuikwarteschlange in Microservices
Mit einer Chreue -Warteschlange kann zum Erstellen von Zustandsmaschinen verwendet werden. Alle Informationen über den Zustand dieser Komponenten können extern, ohne direkten Zugriff auf die Komponenten oder auf ihren Zustand reproduziert werden. Dies reduziert die Notwendigkeit zusätzlicher Protokollierung erheblich. Jede Protokollierung, die Sie benötigen, kann jedoch ausführlich aufgezeichnet werden. Dies macht das Ermöglichen von DEBUG
-Protokollierung in der Produktion praktisch. Dies liegt daran, dass die Kosten für die Protokollierung sehr niedrig sind; Weniger als 10 Mikrosekunden. Protokolle können zentral zur Protokollkonsolidierung repliziert werden. Die Chreue -Warteschlange wird verwendet, um mehr als 100 TB Daten zu speichern, die ab jedem Zeitpunkt wiederholt werden können.
Nicht-stapelende Streaming-Komponenten sind sehr leistungsfähig, deterministisch und reproduzierbar. Sie können Fehler reproduzieren, die erst nach einer Million Events in einer bestimmten Reihenfolge auftreten, mit beschleunigten realistischen Zeiten. Dies macht die Verwendung der Stream -Verarbeitung für Systeme attraktiv, die ein hohes Maß an Qualitätsergebnissen benötigen.
Veröffentlichungen sind auf Maven Central als:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
Siehe Release -Notizen von Chronicle Queue und erhalten Sie die neueste Versionsnummer. Schnappschüsse finden Sie unter https://oss.sonatype.org
Notiz | Klassen, die sich in einem der Pakete "intern", "Impl" und "Haupt" befinden (letztere enthalten verschiedene laufbare Hauptmethoden), und alle Unterverpackungen sind kein Teil der öffentlichen API und können sich bei jedem ändern. Zeit aus irgendeinem Grund . Weitere Informationen finden Sie in den jeweiligen package-info.java Dateien. |
In Chreue-Queue-Tailern sind jetzt schreibgeschützt. In Chreue Queue V4 hatten wir das Konzept der faulen Indexierung, bei dem Appierer keine Indizes schreiben würden, sondern die Indizierung durch den Tailer. Wir beschlossen, die faule Indexierung in V5 fallen zu lassen; Durch die Schaffung von Tailern nur schreibgeschützt, vereinfacht wir nicht nur die Chreuik-Warteschlange der Chronik, sondern ermöglicht es uns auch, Optimierungen an anderer Stelle im Code hinzuzufügen.
Das Verriegelungsmodell der Chreue -Warteschlange wurde in V5 geändert. In der Chreue -Warteschlange V4 befindet sich die Schreibschloss (um eine gleichzeitige Schreibvorgänge in die Warteschlange zu verhindern) in der .cq4 -Datei vorhanden. In V5 wurde dies in eine einzelne Datei namens Table Store (metadata.cq4t) verschoben. Dies vereinfacht den Verriegelungscode intern, da nur die Tabellenspeicherdatei inspiziert werden muss.
Sie können Chreue -V5 -Chreuik -Warteschlangen verwenden, um Nachrichten zu lesen, die mit Chreue -Warteschlangen V4 geschrieben wurden. Dies funktioniert jedoch nicht wireType(WireType.FIELDLESS_BINARY)
garantiert immer. Lesen Sie den Header der Warteschlange. Wir haben einige Tests für V5 -Lesen von V4 -Warteschlangen, aber diese sind begrenzt und alle Szenarien werden möglicherweise nicht unterstützt.
Sie können keine Chreue -Queue -Queue -Queue verwenden, um an Chreues -Warteschlangen von Chronicle Queue zu schreiben.
Die Chreue-Queue von Chronicle V4 ist eine vollständige Umschrift der Chronikwarteschlange, die die folgenden Probleme löst, die in V3 vorhanden waren.
Ohne selbstbeschreibende Nachrichten mussten die Benutzer ihre eigene Funktionalität zum Abwerfen von Nachrichten und zum langfristigen Speicher von Daten erstellen. Mit V4 müssen Sie dies nicht tun, aber Sie können es, wenn Sie möchten.
Die Vanille -Chronik -Warteschlange erstellt eine Datei pro Thread. Dies ist in Ordnung, wenn die Anzahl der Threads gesteuert wird. Viele Anwendungen haben jedoch wenig oder gar keine Kontrolle darüber, wie viele Threads verwendet werden und dies verursachte Usability -Probleme.
Die Konfiguration für indizierte und Vanille -Chronik war vollständig im Code, so dass der Leser die gleiche Konfiguration wie die Autoren haben musste, und es war nicht immer klar, was das war.
Es gab keine Möglichkeit für den Produzenten zu wissen, wie viele Daten auf die zweite Maschine repliziert worden waren. Die einzige Problemumgehung bestand darin, Daten an die Hersteller zurückzulegen.
Sie mussten die Datengröße angeben, um zu reservieren, bevor Sie Ihre Nachricht schreiben.
Sie mussten Ihre eigene Verriegelung für den Appender durchführen, wenn Sie indexierte Chronik verwenden.
In Chreue V3 in Chronicle war alles in Bezug auf Bytes, nicht in Draht. Es gibt zwei Möglichkeiten, Byte in der Chreue -Warteschlange V4 zu verwenden. Sie können die writeBytes
und readBytes
-Methoden verwenden oder die bytes()
aus dem Kabel abrufen. Zum Beispiel:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chreue Enterprise Edition von Chronicle Queue ist eine kommerziell unterstützte Version unserer erfolgreichen Open Source Chronicle -Warteschlange. Die Open -Source -Dokumentation wird durch die folgenden Dokumente erweitert, um die zusätzlichen Funktionen zu beschreiben, die verfügbar sind, wenn Sie für die Enterprise Edition lizenziert sind. Diese sind:
Verschlüsselung von Nachrichtenwarteschlangen und Nachrichten. Weitere Informationen finden Sie unter Verschlüsselungsdokumentation.
TCP/IP (und optional UDP) Replikation zwischen Hosts, um die Echtzeitsicherung aller Ihrer Warteschlangendaten sicherzustellen. Weitere Informationen finden Sie in der Replikationsdokumentation. Das Warteschlangenreplikationsprotokoll wird im Replikationsprotokoll behandelt.
TimeZone -Unterstützung für die tägliche Warteschlangen -Übersetzungsplanung. Weitere Informationen finden Sie in TimeZone Support.
Async -Modusunterstützung, um eine verbesserte Leistung bei einem hohen Durchsatz bei langsameren Dateisystemen zu ermöglichen. Weitere Informationen finden Sie unter Async -Modus und auch Leistung.
Vorhergänger für verbesserte Ausreißer siehe Pre-toucher und seine Konfiguration
Darüber hinaus werden Sie von unseren technischen Experten vollständig unterstützt.
Weitere Informationen zur Chreue Enterprise Edition von Chreuics erhalten Sie von [email protected].
Eine Chreue -Warteschlange wird durch SingleChronicleQueue.class
definiert, die zur Unterstützung von:
Rollendateien täglich, wöchentlich oder stündlich,
gleichzeitige Schriftsteller auf derselben Maschine,
Gleichzeitige Leser auf derselben Maschine oder über mehrere Maschinen über TCP -Replikation (mit Chreue -Enterprise Chronicle),
Gleichzeitige Leser und Autoren zwischen Docker oder anderen Container -Workloads
Serialisierung und Deserialisierung der Kopie ohne Kopie,
Millionen von Schreibvorgängen/Lesevorgängen pro Sekunde auf Rohstoffhardware.
Ungefähr 5 Millionen Nachrichten/Sekunde für 96-Byte-Nachrichten auf einem i7-4790-Prozessor. Eine Warteschlangenverzeichnisstruktur lautet wie folgt:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
Das Format besteht aus Bytes, die mit BinaryWire
oder TextWire
formatiert werden. Die Chreue der Chronik ist so konzipiert, dass sie aus dem Code gefahren werden soll. Sie können problemlos eine Schnittstelle hinzufügen, die Ihren Anforderungen entspricht.
Notiz | Aufgrund des ziemlich niedrigen Betriebsbetriebs können die Lesen/Schreibvorgänge der Chronik-Warteschlange nicht überprüfte Ausnahmen machen. Um den Tod des Threads zu verhindern, kann es praktisch sein, RuntimeExceptions zu fangen und sie gegebenenfalls zu protokollieren/zu analysieren. |
Notiz | Für Demonstrationen, wie die Chreue -Warteschlange der Chronik -Warteschlange und für die Java -Dokumentation verwendet werden kann |
In den folgenden Abschnitten führen wir zunächst eine Terminologie und eine kurze Referenz ein, um die Chreuik -Warteschlange zu verwenden. Dann bieten wir einen detaillierteren Leitfaden.
Chreue -Warteschlange ist ein anhaltendes Journal of Messages, das gleichzeitige Autoren und Leser auch über mehrere JVMs auf derselben Maschine unterstützt. Jeder Leser sieht jede Nachricht, und ein Leser kann jederzeit beitreten und trotzdem jede Nachricht sehen.
Notiz | Wir vermeiden den Begriff Verbraucher absichtlich und verwenden stattdessen Leser , da Nachrichten durch Lesen nicht konsumiert/zerstört werden. |
Die Chreue der Chronik hat die folgenden Hauptkonzepte:
Auszug
Auszug ist der Hauptdatencontainer in einer Chreue -Warteschlange. Mit anderen Worten, jede Chreus -Warteschlange besteht aus Auszügen. Das Schreiben von Nachricht an eine Chreue -Warteschlange bedeutet, einen neuen Auszug zu starten, eine Nachricht zu schreiben und den Auszug am Ende zu beenden.
Appender
Ein Appender ist die Quelle von Nachrichten; So etwas wie ein Iterator in der Chronikumgebung. Sie fügen Daten hinzu, die die aktuelle Chreuik -Warteschlange anhängen. Es kann sequentielle Schreibvorgänge durchführen, indem sie nur bis zum Ende der Warteschlange angehängt werden. Es gibt keine Möglichkeit, Auszüge einzufügen oder zu löschen.
Tailer
Ein Tailer ist ein Auszugsleser, der für sequentielle Lesevorgänge optimiert ist. Es kann sequentielle und zufällige Lesevorgänge sowohl vorwärts als auch rückwärts ausführen. Tailers lesen die nächste verfügbare Nachricht jedes Mal, wenn sie aufgerufen werden. Die Anhängerschaft sind in der Chronik -Warteschlange garantiert:
Für jeden Appender werden Nachrichten in der Reihenfolge geschrieben, die der Appender sie geschrieben hat. Nachrichten von verschiedenen Appendern sind verschachtelt,
Für jeden Tailer wird jede Nachricht für ein Thema in der gleichen Reihenfolge wie jeder andere Tailer angezeigt.
Bei der Wiederholung verfügt jede Replik über eine Kopie jeder Nachricht.
Die Chreue der Chronik ist maklerlos. Wenn Sie eine Architektur mit einem Broker benötigen, wenden Sie sich bitte an [email protected].
Datei -Rollen- und Warteschlangendateien
Die Chreue -Warteschlange ist so konzipiert, dass sie ihre Dateien je nach der Erstellung der Warteschlange rollen (siehe Rollcycles). Mit anderen Worten, eine Warteschlangendatei wird für jeden Rollzyklus erstellt, der eine Erweiterung cq4
hat. Wenn der Rollzyklus den Punkt erreicht, den er rollen sollte, schreibt Appender EOF
am Ende der aktuellen Datei, um anzuzeigen, dass kein anderer Appender in diese Datei schreiben sollte und kein Tailer weiter lesen sollte, und stattdessen sollte jeder eine neue Datei verwenden.
Wenn der Vorgang heruntergefahren und später neu gestartet wurde, wenn der Rollzyklus eine neue Datei verwenden sollte, versucht ein Appender, alte Dateien zu lokalisieren und eine EOF
-Marke in sie zu schreiben, um das Lesen des Tailers beim Lesen zu helfen.
Themen
Jedes Thema ist ein Verzeichnis von Warteschlangendateien. If you have a topic called mytopic
, the layout could look like this:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
Um alle Daten für einen einzigen Tag (oder Zyklus) zu kopieren, können Sie die Datei für diesen Tag für die Wiederholungstests in Ihre Entwicklungsmaschine kopieren.
Einschränkungen zu Themen und Nachrichten
Die Themen beschränken sich auf Saiten, die als Verzeichnisnamen verwendet werden können. Innerhalb eines Themas können Sie Unterthemen haben, die jeder Datentyp sein können, der serialisiert werden kann. Nachrichten können serialisierbare Daten sein.
Die Chreue der Chronik -Warteschlange unterstützt:
Serializable
Objekte, obwohl dies vermieden werden soll, da es nicht effizient ist
Externalizable
objects is preferred if you wish to use standard Java APIs.
byte[]
und String
Marshallable
; Eine selbst beschriebene Nachricht, die als Yaml, binärer Yaml oder JSON geschrieben werden kann.
BytesMarshallable
, das binäre oder Textcodierung auf niedriger Ebene ist.
Dieser Abschnitt enthält eine kurze Referenz für die Verwendung von Chreuicle -Warteschlangen, um kurz zu zeigen, wie man in eine Warteschlange erstellt, in/ausgelesen wird.
Chreue -Konstruktion von Chronik
Das Erstellen einer Instanz der Chronik -Warteschlange unterscheidet sich von nur einem Konstruktor. Um eine Instanz zu erstellen, müssen Sie den ChronicleQueueBuilder
verwenden.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
In diesem Beispiel haben wir eine IndexedChronicle
erstellt, die zwei RandomAccessFiles
erstellt; eine für Indizes und eine für Daten mit Namen relativ:
${java.io.tmpdir}/getting-started/{today}.cq4
Schreiben in eine Warteschlange
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
Lesen aus einer Warteschlange
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
Außerdem kann die Methode von ChronicleQueue.dump()
verwendet werden, um den Rohinhalt als Zeichenfolge abzuwerfen.
queue . dump ();
Aufräumarbeiten
Chronicle Queue stores its data off-heap, and it is recommended that you call close()
once you have finished working with Chronicle Queue, to free resources.
Notiz | Wenn Sie dies tun, gehen keine Daten verloren. Dies dient nur dazu, Ressourcen zu reinigen, die verwendet wurden. |
queue . close ();
Alles zusammenstellen
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
Sie können eine Chreus -Warteschlange mit seinen Konfigurationsparametern oder Systemeigenschaften konfigurieren. Darüber hinaus gibt es verschiedene Möglichkeiten zum Schreiben/Lesen in MethodWriter
aus einer Warteschlange, z MethodReader
Die Chreue -Warteschlange (CQ) kann über eine Reihe von Methoden in der SingleChronicleQueueBuilder
-Klasse konfiguriert werden. Einige der Parameter, die von unseren Kunden am stärksten abgefragt wurden, werden unten erklärt.
Rollcycle
Der RollCycle
-Parameter konfiguriert die Rate, mit der CQ die zugrunde liegenden Warteschlangendateien rollt. Beispielsweise führt die Verwendung des folgenden Code -Snippets dazu, dass die Warteschlangendateien (dh eine neue Datei erstellt) jede Stunde gerollt werden:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
Sobald der Rollzyklus einer Warteschlange festgelegt wurde, kann er zu einem späteren Zeitpunkt nicht geändert werden. Alle weiteren Instanzen von SingleChronicleQueue
, die für die Verwendung desselben Pfads konfiguriert sind, sollten so konfiguriert werden, dass derselbe Rollzyklus verwendet wird. Wenn dies nicht der Fall ist, wird der Rollzyklus so aktualisiert, dass er dem persistierten Rollzyklus entspricht. In diesem Fall wird eine Warnprotokollnachricht gedruckt, um den Bibliotheksbenutzer über die Situation zu informieren:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
Konsolenausgabe:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
Die maximale Anzahl von Nachrichten, die in einer Warteschlangendatei gespeichert werden können, hängt vom Rollenzyklus ab. Weitere Informationen dazu finden Sie in den FAQ.
In der Chreuicle -Warteschlange basiert die Rollover -Zeit auf UTC. Die Funktion "TimeZone Rollover Enterprise" erweitert die Fähigkeit der Chreuik -Queue, die Zeit und Periodizität von Warteschlangen -Rollovers anstelle von UTC anzugeben. Weitere Informationen finden Sie in TimeZone Warteschlange.
Die FileUtil
-Klasse von Chronicle -Queues bietet nützliche Methoden zum Verwalten von Warteschlangendateien. Siehe direktes Verwalten von Rolldateien.
Abhören
It's possible to configure how Chronicle Queue will store the data by explicitly set the WireType
:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
Zum Beispiel:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
Obwohl es möglich ist, bei der Erstellung eines Bauunternehmens explizit einen Abhöre zu bieten, wird er entmutigt, da noch nicht alle Drahttypen von Chreuicle -Warteschlangen unterstützt werden. Insbesondere werden die folgenden Drahttypen nicht unterstützt:
Text (und im Wesentlichen alle basierend auf Text, einschließlich JSON und CSV)
ROH
Read_any
blockieren
Wenn eine Warteschlange gelesen/geschrieben wird, wird ein Teil der derzeit gelesenen/geschriebenen Datei einem Speichersegment zugeordnet. Dieser Parameter steuert die Größe des Speicherzuordnungsblocks. Sie können diesen Parameter mithilfe der Methode SingleChronicleQueueBuilder.blockSize(long blockSize)
ändern, falls dies erforderlich ist.
Notiz | Sie sollten es vermeiden, blockSize unnötig zu wechseln. |
Wenn Sie große Nachrichten senden, sollten Sie eine große blockSize
einstellen, dh die blockSize
sollte mindestens das Vierfache der Nachrichtengröße betragen.
Warnung | Wenn Sie kleine blockSize für große Nachrichten verwenden, erhalten Sie eine IllegalStateException und das Schreiben wird abgebrochen. |
Wir empfehlen, dass Sie für jede Warteschlangeninstanz dasselbe blockSize
für jede Warteschlange verwenden, wenn Sie Warteschlangen replizieren. Das blockSize
wird nicht in die Metadaten der Warteschlange geschrieben. Sie sollten daher idealerweise auf denselben Wert festgelegt werden to run with a different blocksize
you can).
Tipp | Verwenden Sie für jede Instanz der replizierten Warteschlangen dasselbe blockSize . |
Indexspazierungen
Dieser Parameter zeigt den Raum zwischen explizit indizierten Auszügen. Eine höhere Zahl bedeutet eine höhere sequentielle Schreibleistung, aber langsamer zufälliger Zugriffslese. Die sequentielle Leseleistung wird von dieser Eigenschaft nicht beeinflusst. Beispielsweise kann der folgende Standard -Indexabstand zurückgegeben werden:
16 (minutiös)
64 (täglich)
Sie können diesen Parameter mit der Methode SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
ändern.
IndexCount
Die Größe jedes Indexarrays sowie die Gesamtzahl der Indexarrays pro Warteschlangendatei.
Notiz | IndexCount 2 ist die maximale Anzahl indizierter Warteschlangeneinträge. |
Notiz | Weitere Informationen und Beispiele für die Verwendung von Indizes finden Sie in der Abschnittsauszugs -Indexierung in der Chronik -Warteschlange dieser Benutzerhandbuch. |
ReadBuffermode, writeBuffermode
Diese Parameter definieren Puffermode für Lese- oder Schreibvorgänge, die die folgenden Optionen haben:
None
- die Standardeinstellung (und der einzige, der für Open -Source -Benutzer verfügbar ist), keine Pufferung;
Copy
- in Verbindung mit Verschlüsselung verwendet;
Asynchronous
- Verwenden Sie beim Lesen und/oder Schreiben einen asynchronen Puffer, der im Chronicle Async -Modus bereitgestellt wird.
BufferCapacity
RingBuffer -Kapazität in Bytes bei der Verwendung bufferMode: Asynchronous
In der Chreuicle -Warteschlange beziehen wir uns auf das Schreiben Ihrer Daten an die Chreuicle -Warteschlange, um einen Auszug zu speichern. Diese Daten können aus jedem Datentyp bestehen, einschließlich Text, Zahlen oder serialisierten Blobs. Letztendlich werden alle Ihre Daten, unabhängig davon, was sie ist, als eine Reihe von Bytes gespeichert.
Kurz bevor Sie Ihren Auszug aufbewahren, behält sich Chreue Queue einen 4-Byte-Header vor. Chreue der Chronik schreibt die Länge Ihrer Daten in diesen Header. Auf diese Weise weiß es, wie lange jeder Datenblock ist, wenn die Chreue -Warteschlange der Chronik zum Lesen Ihres Auszugs gelesen wird. Wir bezeichnen diesen 4-Byte-Header zusammen mit Ihrem Auszug als Dokument. Streng genommen können Chreus der Chronik verwendet werden, um Dokumente zu lesen und zu schreiben.
Notiz | Innerhalb dieses 4-Byte-Headers reservieren wir auch ein paar Bits für eine Reihe interner Operationen, z. B. das Verriegelungen, um sowohl Prozessoren als auch Threads über die Thread-Sicherheit von Chronicle-Warteschlangen zu gestalten. Das Wichtigste ist, dass Sie die 4 Bytes aus diesem Grund nicht strikt in eine Ganzzahl umwandeln können, um die Länge Ihres Datenblobs zu finden. |
Wie bereits erwähnt, verwendet die Chreue -Warteschlange mit einem Appender in die Warteschlange und einen Tailer , um aus der Warteschlange zu lesen. Im Gegensatz zu anderen Java -Warteschlangenlösungen gehen Nachrichten nicht verloren, wenn sie mit einem Tailer gelesen werden. Dies wird im Abschnitt unten ausführlicher über "Lesen einer Warteschlange mit einem Tailer" detaillierter behandelt. Um Daten in eine Chreue -Warteschlange zu schreiben, müssen Sie zunächst einen Appender erstellen:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
Die Chreue-Warteschlange verwendet die folgende Schnittstelle auf niedriger Ebene, um die Daten zu schreiben:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
Das Schließen der Try-with-Ressourcen ist der Punkt, an dem die Länge der Daten in den Header geschrieben wird. Sie können auch den DocumentContext
verwenden, um den Index zu ermitteln, dass Ihre Daten gerade zugewiesen wurden (siehe unten). Sie können diesen Index später verwenden, um diesen Auszug zu bewegen/nachzuschlagen. Jeder Chreue -Auszug der Chronik hat einen eindeutigen Index.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
Die folgenden hochrangigen Methoden wie writeText()
sind Convenience-Methoden zum Aufrufen appender.writingDocument()
, aber beide Ansätze tun im Wesentlichen dasselbe. Der tatsächliche Code von writeText(CharSequence text)
sieht so aus:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
Sie haben also die Auswahl einer Reihe von Schnittstellen auf hoher Ebene bis hin zu einer API auf niedriger Ebene bis hin zu rohen Speicher.
Dies ist die API auf höchster Ebene, die die Tatsache verbirgt, dass Sie überhaupt mit Nachrichten schreiben. Der Vorteil ist, dass Sie Anrufe mit einer realen Komponente oder einer Schnittstelle zu einem anderen Protokoll an die Schnittstelle tauschen können.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
Sie können eine "selbstbeschreibende Nachricht" schreiben. Solche Nachrichten können Schemaänderungen unterstützen. Sie sind auch leichter zu verstehen, wenn Probleme debuggen oder diagnostiziert werden.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
Sie können "Rohdaten" schreiben, die sich selbst beschreiben. Die Typen sind immer korrekt; Position ist der einzige Hinweis auf die Bedeutung dieser Werte.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
Sie können "Rohdaten" schreiben, die nicht selbst beschreiben. Ihr Leser muss wissen, was diese Daten bedeutet und welche verwendet wurden, die verwendet wurden.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
Im Folgenden wird der niedrigste Weg zum Schreiben von Daten dargestellt. Sie erhalten eine Adresse zum RAW -Speicher und können schreiben, was Sie wollen.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
Sie können den Inhalt der Warteschlange drucken. Sie können die ersten beiden sehen und die letzten beiden Nachrichten speichern die gleichen Daten.
// dump the content of the queue
System . out . println ( queue . dump ());
Drucke:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
Das Lesen der Warteschlange folgt dem gleichen Muster wie das Schreiben, außer dass die Möglichkeit besteht, dass es keine Nachricht gibt, wenn Sie versuchen, es zu lesen.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
Sie können jede Nachricht in einen Methodenaufruf basieren, der auf dem Inhalt der Nachricht basiert, und lassen Sie die Argumente der Methode automatisch deserialisieren. Das Aufrufen von reader.readOne()
überspringt automatisch alle Nachrichten, die nicht mit Ihrem Methodenleser übereinstimmen.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
Sie können die Nachricht selbst dekodieren.
Notiz | Die Namen, Typ und Reihenfolge der Felder müssen nicht übereinstimmen. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
Sie können selbstbeschreibende Datenwerte lesen. Dadurch werden die Typen korrekt und nach Bedarf konvertieren.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
Sie können Rohdaten als Primitive und Zeichenfolgen lesen.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
Sie können die zugrunde liegende Speicheradresse erhalten und auf den nativen Speicher zugreifen.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
Notiz | Jeder Tailer sieht jede Nachricht. |
Filtermeldungen kann eine Abstraktion hinzugefügt werden oder nur einem Nachrichtenprozessor Nachrichten zuweisen. Im Allgemeinen benötigen Sie jedoch nur einen Haupt -Tailer für ein Thema, mit möglicherweise einigen unterstützenden Abschnitten für die Überwachung usw.
Da die Chreue -Warteschlange in der Chronik nicht die Themen aufteilt, erhalten Sie die Gesamtbestellung aller Nachrichten in diesem Thema. Bei Themen gibt es keine Garantie für die Bestellung. Wenn Sie bestimmt aus einem System, das aus mehreren Themen konsumiert, bestimmt wiederholen möchten, empfehlen wir, die Ausgabe dieses Systems wiederzugeben.
Chreue -Tailer von Chronicle können Dateihandler erstellen, die Dateihandler werden gereinigt, wenn die zugehörige Methode der Chreuik close()
aufgerufen wird oder wenn das JVM eine Müllsammlung ausführt. Wenn Sie Ihren Code schreiben, verfügen Sie nicht über GC -Pausen und möchten die Dateihandler ausdrücklich aufräumen, können Sie Folgendes anrufen:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
In einigen Anwendungen kann es erforderlich sein, vom Ende der Warteschlange vom Ende der Warteschlange zu lesen (z. B. in einem Neustartszenario). Für diesen Anwendungsfall liefert ExcerptTailer
die toEnd()
-Methode. Wenn die Tailer -Richtung standardmäßig FORWARD
oder von toEnd()
ExcerptTailer.direction
festgelegt wird. In diesem Fall ist der Tailer nun bereit, neue Datensätze zu lesen, die der Warteschlange angehängt sind. Bis alle neuen Nachrichten an die Warteschlange beigefügt sind, gibt es keinen neuen DocumentContext
für das Lesen:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
Wenn es notwendig ist, vom Ende rückwärts durch die Warteschlange zu lesen, kann der Tailer so eingestellt werden, dass sie rückwärts lesen:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
Beim Rückwärtslesen verschiebt die toEnd()
-Methode den Tailer in die letzte Aufzeichnung in der Warteschlange. Wenn die Warteschlange nicht leer ist, gibt es einen DocumentContext
zum Lesen:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
AKA namens Tailers.
Es kann nützlich sein, einen Tailer zu haben, der von der Stelle, an der es bis zum Neustart der Anwendung war, fortgesetzt wird.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "A" Last liest Nachricht 2
Tailer "a" next reads message 3
Tailer "b" last reads message 0
Tailer "b" next reads message 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
Notiz | The direction() is not preserved across restarts, only the next index to be read. |
Notiz | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
Tipp | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
Notiz | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; private String name; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
oder
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Notiz | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
Warnung | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
Notiz | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
Notiz | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
Notiz | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
Results:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
Results:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.