https://player.vimeo.com/video/201989439
Chronicle Queue es un marco de mensajería de baja latencia persistido para aplicaciones de alto rendimiento.
Este proyecto cubre la versión Java de Chronicle Queue. También está disponible una versión C ++ de este proyecto y admite la interoperabilidad Java/C ++ más vinculaciones de lenguaje adicionales, por ejemplo, Python. Si está interesado en evaluar la versión C ++, comuníquese con [email protected].
A primera vista, la cola de Chronicle puede verse como simplemente otra implementación de la cola . Sin embargo, tiene las principales opciones de diseño que deben enfatizarse. Utilizando el almacenamiento fuera del montón , Chronicle Queue proporciona un entorno donde las aplicaciones no sufren de recolección de basura (GC). Al implementar aplicaciones de alto rendimiento y de memoria intensiva (¿escuchó el término elegante "BigData"?) En Java, uno de los mayores problemas es la recolección de basura.
La cola de Chronicle permite agregar mensajes al final de una cola ("agregado"), leer desde la cola ("cola") y también admite la búsqueda de acceso aleatorio.
Puede considerar que una cola de Chronicle es similar a un tema duradero/persistido sin corredor de baja latencia que puede contener mensajes de diferentes tipos y tamaños. La cola de Chronicle es una cola persistida no resistente distribuida que:
Admite RMI asíncrono y publica/suscribe interfaces con latencias de microsegundos.
pasa mensajes entre JVM en debajo de un microsegundo
pasa mensajes entre JVM en diferentes máquinas mediante replicación en menos de 10 microsegundos (función empresarial)
proporciona latencias estables y suaves en tiempo real en los millones de mensajes por segundo para un solo hilo a una cola; con el pedido total de cada evento.
Al publicar mensajes de 40 bytes, un alto porcentaje del tiempo logramos latencias con menos de 1 microsegundo. La latencia del percentil 99 es la peor 1 en 100, y el percentil 99.9 es el peor 1 en 1000 latencia.
Tamaño por lotes | 10 millones de eventos por minuto | 60 millones de eventos por minuto | 100 millones de eventos por minuto |
---|---|---|---|
99%Ile | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%ILE | 1.2 µs | 1.3 µs | 1.5 µs |
Tamaño por lotes | 10 millones de eventos por minuto | 60 millones de eventos por minuto | 100 millones de eventos por minuto |
---|---|---|---|
99%Ile | 20 µs | 28 µs | 176 µs |
99.9%ILE | 901 µs | 705 µs | 5.370 µs |
Nota | 100 millones de eventos por minuto envían un evento cada 660 nanosegundos; replicado y persistido. |
Importante | Este rendimiento no se logra utilizando un gran grupo de máquinas . Esto está usando un hilo para publicar, y un hilo para consumir. |
La cola de Chronicle está diseñada para:
Sea una "tienda de registro todo" que puede leer con latencia en tiempo real de microsegundos. Esto admite incluso los sistemas de comercio de alta frecuencia más exigentes. Sin embargo, se puede utilizar en cualquier aplicación donde la grabación de información sea una preocupación.
Admite una replicación confiable con notificación al appender (escritor de mensajes) o un tailer (lector del mensaje), cuando un mensaje se ha replicado con éxito.
La cola de Chronicle supone que el espacio en el disco es barato en comparación con la memoria. La cola de Chronicle hace uso completo del espacio en disco que tiene, por lo que no está limitado por la memoria principal de su máquina. Si usa HDD spinning, puede almacenar muchas cucharadas de espacio en disco para un pequeño costo.
El único software adicional que Chronicle Queue necesita ejecutar es el sistema operativo. No tiene un corredor; En su lugar, utiliza su sistema operativo para hacer todo el trabajo. Si su aplicación muere, el sistema operativo sigue funcionando durante segundos más, por lo que no se pierden datos; Incluso sin replicación.
A medida que la cola de Chronicle almacena todos los datos guardados en archivos mapeados de memoria, esto tiene una sobrecarga trivial en el condado, incluso si tiene más de 100 TB de datos.
Chronicle puso un esfuerzo significativo para lograr una latencia muy baja. En otros productos que se centran en el soporte de aplicaciones web, las latencias de menos de 40 milisegundos están bien, ya que son más rápidas de lo que puede ver; Por ejemplo, la velocidad de cuadro del cine es de 24 Hz, o aproximadamente 40 ms.
La cola de Chronicle tiene como objetivo lograr latencias de menos de 40 microsegundos para 99% a 99.99% del tiempo. Utilizando la cola de Chronicle sin replicación, admitimos aplicaciones con latencias por debajo de 40 microsegundos de extremo a extremo en múltiples servicios. A menudo, la latencia del 99% de la cola crónica depende completamente de la elección del sistema operativo y el subsistema de disco duro.
La replicación para la cola de Chronicle admite la empresa de cable de cable. Esto admite una compresión en tiempo real que calcula los deltas para objetos individuales, como están escritos. Esto puede reducir el tamaño de los mensajes en un factor de 10, o mejor, sin la necesidad de lotes; es decir, sin introducir una latencia significativa.
La cola de Chronicle también admite la compresión LZW, Snappy y GZIP. Sin embargo, estos formatos agregan latencia significativa. Estos solo son útiles si tiene limitaciones estrictas en el ancho de banda de la red.
La cola de Chronicle admite una serie de semánticas:
Cada mensaje se reproduce en reiniciar.
Solo se reproducen nuevos mensajes en reiniciar.
Reiniciar desde cualquier punto conocido utilizando el índice de la entrada.
Reproducir solo los mensajes que se ha perdido. Esto se admite directamente utilizando el MethodReader/MethodWriter Builders.
En la mayoría de los sistemas System.nanoTime()
es aproximadamente el número de nanosegundos desde que el sistema se reinició por última vez (aunque diferentes JVM pueden comportarse de manera diferente). Esto es lo mismo en JVM en la misma máquina, pero muy diferente entre las máquinas. La diferencia absoluta cuando se trata de máquinas no tiene sentido. Sin embargo, la información se puede utilizar para detectar valores atípicos; No puede determinar cuál es la mejor latencia, pero puede determinar qué tan lejos se encuentra las mejores latencias. Esto es útil si se está centrando en las latencias del percentil 99. Tenemos una clase llamada RunningMinimum
para obtener horarios de diferentes máquinas, al tiempo que compensan una deriva en la nanoTime
entre máquinas. Cuanto más a menudo tome medidas, más preciso es este mínimo de ejecución.
La cola de Chronicle gestiona el almacenamiento por ciclo. Puede agregar un StoreFileListener
que le notificará cuándo se agrega un archivo y cuándo ya no se retiene. Puede moverse, comprimir o eliminar todos los mensajes durante un día, a la vez. Nota: Desafortunadamente, en Windows, si se interrumpe una operación IO, puede cerrar el fileanal de fila subyacente.
Debido a las razones de rendimiento, hemos eliminado la verificación de interrupciones en el código de cola Chronicle. Debido a esto, le recomendamos que evite usar la cola de Chronicle con código que genera interrupciones. Si no puede evitar generar interrupciones, le sugerimos que cree una instancia separada de la cola de Chronicle por hilo.
La cola de Chronicle se usa con mayor frecuencia para sistemas centrados en el productor, donde necesita retener muchos datos durante días o años. Para estadísticas, consulte el uso de Crónica
Importante | Chronicle Queue no admite operar ningún sistema de archivos de red, ya sea NFS, AFS, almacenamiento basado en SAN o cualquier otra cosa. La razón de esto es que aquellos sistemas de archivos no proporcionan todas las primitivas requeridas para los archivos mapeados de memoria que utiliza la cola. Si se necesita red de red (por ejemplo, para que los datos sean accesibles para múltiples hosts), la única forma compatible es la replicación de la cola de crónica (función empresarial). |
La mayoría de los sistemas de mensajería están centrados en el consumidor. El control de flujo se implementa para evitar que el consumidor se sobrecargue; incluso momentáneamente. Un ejemplo común es un servidor que admite múltiples usuarios de la GUI. Esos usuarios pueden estar en diferentes máquinas (sistema operativo y hardware), diferentes cualidades de red (latencia y ancho de banda), haciendo una variedad de otras cosas en diferentes momentos. Por esta razón, tiene sentido que el consumidor del cliente le diga al productor cuándo retroceder, retrasando cualquier dato hasta que el consumidor esté listo para tomar más datos.
Chronicle Queue es una solución centrada en el productor y hace todo lo posible para nunca retroceder al productor, o decirle que disminuya la velocidad. Esto lo convierte en una herramienta poderosa, que proporciona un gran búfer entre su sistema y un productor aguas arriba sobre el cual tiene poco o no control.
Los editores de datos del mercado no le dan la opción de retroceder al productor por mucho tiempo; Si es todo. Algunos de nuestros usuarios consumen datos de CME OPRA. Esto produce picos de 10 millones de eventos por minuto, enviados como paquetes UDP sin ningún reintento. Si se pierde o suelta un paquete, entonces se pierde. Debe consumir y grabar esos paquetes tan rápido como llegan a usted, con muy poco almacenamiento en búfer en el adaptador de red. Para los datos del mercado en particular, en tiempo real significa en unos pocos microsegundos ; No significa Introdader (durante el día).
La cola de Chronicle es rápida y eficiente, y se ha utilizado para aumentar la velocidad que los datos se pasan entre los hilos. Además, también mantiene un registro de cada mensaje pasado, lo que le permite reducir significativamente la cantidad de registro que debe hacer.
Los sistemas de cumplimiento son requeridos por más y más sistemas en estos días. Todos tienen que tenerlos, pero nadie quiere ser ralentizado por ellos. Al usar la cola de Chronicle para amortiguar los datos entre los sistemas monitoreados y el sistema de cumplimiento, no debe preocuparse por el impacto del registro de cumplimiento para sus sistemas monitoreados. Una vez más, la cola de Chronicle puede apoyar a millones de eventos por segundo, por servir y datos de acceso que se han retenido durante años.
La cola de Chronicle admite IPC de baja latencia (comunicación entre procesos) entre JVM en la misma máquina en el orden de magnitud de 1 microsegundo; así como entre máquinas con una latencia típica de 10 microsegundos para rendimientos modestos de unos pocos cientos de miles. La cola de Chronicle admite el rendimiento de millones de eventos por segundo, con latencias de microsegundos estables.
Ver artículos sobre el uso de la cola de Chronicle en microservicios
Se puede usar una cola de Chronicle para construir máquinas de estado. Toda la información sobre el estado de esos componentes puede reproducirse externamente, sin acceso directo a los componentes o a su estado. Esto reduce significativamente la necesidad de registro adicional. Sin embargo, cualquier registro que necesite se puede grabar con gran detalle. Esto hace que el registro DEBUG
permitida en la producción sea práctica. Esto se debe a que el costo del registro es muy bajo; Menos de 10 microsegundos. Los registros se pueden replicar centralmente para la consolidación de registro. La cola de Chronicle se está utilizando para almacenar más de 100 TB de datos, que se pueden reproducir desde cualquier punto en el tiempo.
Los componentes de transmisión que no son de lotes son altamente performantes, deterministas y reproducibles. Puede reproducir errores que solo aparecen después de un millón de eventos que se reproducen en un orden particular, con tiempos realistas acelerados. Esto hace que el uso del procesamiento de flujo sea atractivo para sistemas que necesitan un alto grado de resultados de calidad.
Los lanzamientos están disponibles en Maven Central como:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
Vea las notas de la versión de la cola de Chronicle y obtenga el último número de versión. Las instantáneas están disponibles en https://oss.sonatype.org
Nota | Clases que residen en cualquiera de los paquetes 'internos', 'impl' y 'principales' (este último que contiene varios métodos principales ejecutables) y cualquier subackage no forman parte de la API pública y puede estar sujeto a cambios en cualquier tiempo por cualquier motivo . Consulte los archivos respectivos package-info.java para más detalles. |
En Chronicle Queue V5 Tailers ahora son de solo lectura, en Chronicle Queue V4 teníamos el concepto de indexación perezosa, donde los apéndices no escribirían índices, sino que el tasador podría hacer la indexación. Decidimos lanzar la indexación perezosa en V5; Hacer sellos de solo lectura no solo simplifica la cola de Chronicle, sino que también nos permite agregar optimizaciones en otras partes del código.
El modelo de bloqueo de la cola de Chronicle se cambió en V5, en la cola de Chronicle V4, el bloqueo de escritura (para evitar escrituras concurrentes en la cola) existe en el archivo .cq4. En V5, esto se movió a un solo archivo llamado Table Store (metadata.cq4t). Esto simplifica el código de bloqueo internamente, ya que solo el archivo de almacenamiento de la tabla debe inspeccionarse.
Puede usar Chronicle Queue V5 para leer mensajes escritos con CHRONICLE Queue V4, pero esto no se garantiza que siempre funcione, si, por ejemplo, creó su cola V4 con wireType(WireType.FIELDLESS_BINARY)
y luego Chronicle Queue V5 no podrá Lea el encabezado de la cola. Tenemos algunas pruebas para las colas de V5 Reading V4, pero estas son limitadas y todos los escenarios pueden no ser compatibles.
No puede usar la cola de Chronicle V5 para escribir en las colas de la cola V4 de Chronicle.
Chronicle Queue V4 es una reescritura completa de la cola de Chronicle que resuelve los siguientes problemas que existieron en V3.
Sin mensajes autodescritos, los usuarios tuvieron que crear su propia funcionalidad para descargar mensajes y almacenamiento a largo plazo de datos. Con V4 no tiene que hacer esto, pero puede si lo desea.
La cola de vainilla Chronicle crearía un archivo por hilo. Esto está bien si el número de subprocesos está controlado, sin embargo, muchas aplicaciones tienen poco o ningún control sobre cuántos hilos se usan y esto causó problemas de usabilidad.
La configuración para la crónica indexada y de vainilla estaba completamente en código, por lo que el lector tenía que tener la misma configuración que los escritores y no siempre estuvo claro qué era eso.
No había forma de que el productor supiera cuántos datos se habían replicado a la segunda máquina. La única solución era replicar datos a los productores.
Necesitaba especificar el tamaño de los datos para reservar antes de comenzar a escribir su mensaje.
Necesitaba hacer su propio bloqueo para el apéndice cuando se usa Chronicle indexado.
En Chronicle Queue V3, todo estaba en términos de bytes, no de alambre. Hay dos formas de usar byte en la cola crónica V4. Puede usar los métodos writeBytes
y readBytes
, o puede obtener los bytes()
del cable. Por ejemplo:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle Queue Enterprise Edition es una versión comercialmente compatible de nuestra exitosa cola de crónica de código abierto. La documentación de código abierto se extiende por los siguientes documentos para describir las características adicionales disponibles cuando tiene licencia para Enterprise Edition. Estos son:
Cifrado de colas y mensajes de mensajes. Para más información, consulte Documentación de cifrado.
Replicación TCP/IP (y opcionalmente UDP) entre hosts para garantizar una copia de seguridad en tiempo real de todos los datos de la cola. Para obtener más información, consulte la documentación de replicación, el protocolo de replicación de la cola se cubre en el protocolo de replicación.
Soporte de zona horaria para la programación diaria de reinversión de la cola. Para obtener más información, consulte Soporte de zona horaria.
Soporte del modo Async para dar un rendimiento mejorado en un alto rendimiento en sistemas de archivos más lentos. Para obtener más información, consulte el modo Async y también el rendimiento.
Pre-toucher para obtener valores atípicos mejorados, ver pre-toucher y su configuración
Además, nuestros expertos técnicos lo respaldarán plenamente.
Para obtener más información sobre Chronicle Queue Enterprise Edition, comuníquese con [email protected].
Una cola de Chronicle se define por SingleChronicleQueue.class
que está diseñada para soportar:
rodar archivos diariamente, semanalmente o por hora,
escritores concurrentes en la misma máquina,
Lectores concurrentes en la misma máquina o a través de múltiples máquinas a través de la replicación TCP (con la empresa de la cola crónica),
Lectores y escritores concurrentes entre Docker u otras cargas de trabajo contenedores
serialización de copia cero y deserialización,
Millones de escrituras/lecturas por segundo en hardware de productos básicos.
Aproximadamente 5 millones de mensajes/segundo para mensajes de 96 bytes en un procesador i7-4790. Una estructura de directorio de cola es la siguiente:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
El formato consiste en bytes prefijado de tamaño que se formatean usando BinaryWire
o TextWire
. La cola de Chronicle está diseñada para ser expulsada del código. Puede agregar fácilmente una interfaz que se adapte a sus necesidades.
Nota | Debido a la operación de bajo nivel, las operaciones de lectura/escritura de la cola de Chronicle pueden arrojar excepciones sin control. Para evitar la muerte del hilo, podría ser práctico capturar RuntimeExceptions y registrarlas/analizarlas según corresponda. |
Nota | Para obtener manifestaciones de cómo se puede usar la cola de Chronicle, consulte la demo de la cola de la crónica y para la documentación de Java, consulte la cola de crónica Javadocs |
En las siguientes secciones, primero presentamos algo de terminología y una referencia rápida para usar la cola de Chronicle. Luego, proporcionamos una guía más detallada.
Chronicle Queue es una revista persistida de mensajes que admite escritores y lectores concurrentes incluso en múltiples JVM en la misma máquina. Cada lector ve cada mensaje, y un lector puede unirse en cualquier momento y aún ver cada mensaje.
Nota | Evitamos deliberadamente el término consumidor y, en su lugar, usamos el lector ya que los mensajes no se consumen/destruyen mediante la lectura. |
La cola de Chronicle tiene los siguientes conceptos principales:
Extracto
El extracto es el principal contenedor de datos en una cola de Chronicle. En otras palabras, cada cola crónica está compuesta de extractos. Escribir mensaje a una cola de Chronicle significa comenzar un nuevo extracto, escribir un mensaje y terminar el extracto al final.
Apéndice
Un apéndice es la fuente de mensajes; Algo como un iterador en el entorno de la crónica. Agrega datos que agreguen la cola de Chronicle actual. Puede realizar escrituras secuenciales agregando solo al final de la cola. No hay forma de insertar ni eliminar extractos.
Tailer
Un tailer es un lector de extracto optimizado para lecturas secuenciales. Puede realizar lecturas secuenciales y aleatorias, tanto hacia adelante como hacia atrás. Tasilers lee el siguiente mensaje disponible cada vez que se les llame. Los seguidores están garantizados en la cola de Chronicle:
Para cada apéndice , los mensajes se escriben en el orden que los Appender los escribieron. Los mensajes de los diferentes apéndices están entrelazados,
Para cada tailer , verá todos los mensajes para un tema en el mismo orden que cualquier otro tailer,
Cuando se replica, cada réplica tiene una copia de cada mensaje.
La cola de Chronicle no tiene corredor. Si necesita una arquitectura con un corredor, comuníquese con [email protected].
Archivos de rodillos y colas
La cola de Chronicle está diseñada para rodar sus archivos dependiendo del ciclo de balanceo elegido cuando se crea la cola (ver Rollycles). En otras palabras, se crea un archivo de cola para cada ciclo de balanceo que tiene la extensión cq4
. Cuando el ciclo de balanceo llega al punto que debe rodar, Appender escribirá atómicamente EOF
Mark al final del archivo actual para indicar que ningún otro appender debe escribir en este archivo y ningún tailer debe leer más, y en su lugar todos deben usar un archivo nuevo.
Si el proceso se cerró y se reinició más tarde, cuando el ciclo de balanceo debe usar un nuevo archivo, un apropiador intentará localizar archivos antiguos y escribir una marca EOF
en ellos para ayudar a los tasas a leerlos.
Temas
Cada tema es un directorio de archivos de cola. Si tiene un tema llamado mytopic
, el diseño podría verse así:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
Para copiar todos los datos para un solo día (o ciclo), puede copiar el archivo para ese día en su máquina de desarrollo para las pruebas de repetición.
Restricciones sobre temas y mensajes
Los temas se limitan a ser cadenas que pueden usarse como nombres de directorio. Dentro de un tema, puede tener subtópicos que pueden ser cualquier tipo de datos que pueda ser serializado. Los mensajes pueden ser cualquier datos serializables.
La cola de Chronicle admite:
Objetos Serializable
, aunque esto debe evitarse ya que no es eficiente
Se prefiere objetos Externalizable
si desea utilizar API Java estándar.
byte[]
y String
Marshallable
; Un mensaje de describir a sí mismo que se puede escribir como YAML, YAML BINARIO o JSON.
BytesMarshallable
, que es binario de bajo nivel, o codificación de texto.
Esta sección proporciona una referencia rápida para usar la cola de Chronicle para mostrar brevemente cómo crear, escribir/leer en/desde una cola.
Construcción de la cola de crónica
Crear una instancia de crónica de colas es diferente de simplemente llamar a un constructor. Para crear una instancia, debe usar el ChronicleQueueBuilder
.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
En este ejemplo, hemos creado un IndexedChronicle
que crea dos RandomAccessFiles
; uno para índices y otro para datos que tienen nombres relativamente:
${java.io.tmpdir}/getting-started/{today}.cq4
Escribir a una cola
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
Leyendo de una cola
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
Además, el método ChronicleQueue.dump()
se puede usar para descargar el contenido sin procesar como una cadena.
queue . dump ();
Limpieza
Chronicle Queue almacena sus datos fuera de tiempo, y se recomienda que llame close()
una vez que haya terminado de trabajar con Chronicle Queue, para recursos gratuitos.
Nota | No se perderán datos si hace esto. Esto es solo para limpiar los recursos que se utilizaron. |
queue . close ();
Poniendo todo junto
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
Puede configurar una cola de Chronicle utilizando sus parámetros de configuración o propiedades del sistema. Además, hay diferentes formas de escribir/leer en/desde una cola, como el uso de proxies y el uso de MethodReader
y MethodWriter
.
La cola de Chronicle (CQ) se puede configurar a través de una serie de métodos en la clase SingleChronicleQueueBuilder
. Algunos de los parámetros que más fueron consultados por nuestros clientes se explican a continuación.
Ciclista
El parámetro RollCycle
configura la velocidad a la que CQ rodará los archivos de cola subyacentes. Por ejemplo, el uso del siguiente fragmento de código dará como resultado que los archivos de cola se están rodando (es decir, un nuevo archivo creado) cada hora:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
Una vez que se ha establecido el ciclo de rollo de una cola, no se puede cambiar en una fecha posterior. Cualquier instancia adicional de SingleChronicleQueue
configurado para usar la misma ruta debe configurarse para usar el mismo ciclo de rollo, y si no lo están, entonces el ciclo de balanceo se actualizará para que coincida con el ciclo de rollo persistido. En este caso, se imprimirá un mensaje de registro de advertencia para notificar al usuario de la biblioteca sobre la situación:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
Salida de la consola:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
El número máximo de mensajes que se pueden almacenar en un archivo de cola depende del ciclo de balanceo. Consulte las preguntas frecuentes para obtener más información sobre esto.
En la cola de Chronicle, el tiempo de reinversión se basa en UTC. La función de la empresa de transferencia de zona horaria extiende la capacidad de Chronicle Queue para especificar el tiempo y la periodicidad de los reinversos de la cola, en lugar de UTC. Para obtener más información, consulte la reinversión de la cola horaria.
La clase CHRONICLE Queue FileUtil
proporciona métodos útiles para administrar archivos de cola. Consulte Administrar archivos Roll directamente.
wiretype
Es posible configurar cómo la cola de Chronicle almacenará los datos estableciendo explícitamente el WireType
:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
Por ejemplo:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
Aunque es posible proporcionar explícitamente el tipo de cable al crear un constructor, se desaconseja ya que aún no todos los tipos de alambre son compatibles con la cola de Chronicle. En particular, los siguientes tipos de cable no son compatibles:
Texto (y esencialmente todo basado en el texto, incluidos JSON y CSV)
CRUDO
Read_any
dimensionar
Cuando se lee/escribe una cola, parte del archivo que actualmente se lee/escribe se asigna a un segmento de memoria. Este parámetro controla el tamaño del bloque de mapeo de memoria. Puede cambiar este parámetro utilizando el método SingleChronicleQueueBuilder.blockSize(long blockSize)
si es necesario.
Nota | Debe evitar cambiar blockSize innecesariamente. |
Si está enviando mensajes grandes, entonces debe establecer un gran blockSize
, es decir, el blockSize
debe ser al menos cuatro veces el tamaño del mensaje.
Advertencia | Si usa un pequeño blockSize para mensajes grandes, recibe una IllegalStateException y se aborta la escritura. |
Le recomendamos que use el mismo blockSize
para cada instancia de cola al replicar las colas, el blockSize
no se escribe en los metadatos de la cola, por lo que idealmente debe establecerse en el mismo valor al crear sus instancias de cola de crónica (esto se recomienda pero si lo desea. Para ejecutar con un blocksize
diferente, puede).
Consejo | Use el mismo blockSize para cada instancia de colas replicadas. |
ritmo de índice
Este parámetro muestra el espacio entre extractos que se indexan explícitamente. Un número más alto significa un rendimiento de escritura secuencial más alto, pero se lee un acceso aleatorio más lento. El rendimiento de lectura secuencial no se ve afectado por esta propiedad. Por ejemplo, se puede devolver el siguiente espacio de índice predeterminado:
16 (minuciosamente)
64 (todos los días)
Puede cambiar este parámetro utilizando el método SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
.
contenedor
El tamaño de cada matriz de índice, así como el número total de matrices de índice por archivo de cola.
Nota | IndexCount 2 es el número máximo de entradas de cola indexadas. |
Nota | Consulte la indexación de extracto de sección en la cola de Chronicle de esta guía del usuario para obtener más información y ejemplos de uso de índices. |
readbuffermode, writeBuffermode
Estos parámetros definen Buffermode para lecturas o escrituras que tienen las siguientes opciones:
None
: el valor predeterminado (y el único disponible para usuarios de código abierto), sin almacenamiento en búfer;
Copy
: se usa junto con el cifrado;
Asynchronous
: use un búfer asíncrono cuando lea y/o escriba, proporcionado por el modo de asíncrono Chronicle.
capacidad de búfer
Capacidad de RingBuffer en bytes cuando se usa bufferMode: Asynchronous
En la cola de Chronicle nos referimos al acto de escribir sus datos a la cola de Chronicle, como almacenar un extracto. Estos datos podrían estar compensados de cualquier tipo de datos, incluidos texto, números o blobs serializados. En última instancia, todos sus datos, independientemente de lo que sea, se almacenan como una serie de bytes.
Justo antes de almacenar su extracto, Chronicle Queue reserva un encabezado de 4 bytes. Chronicle Queue escribe la longitud de sus datos en este encabezado. De esta manera, cuando la cola de Chronicle llega a leer su extracto, sabe cuánto tiempo dura cada mancha de datos. Nos referimos a este encabezado de 4 bytes, junto con su extracto, como documento. Se puede usar estrictamente en la cola de Chronicle para leer y escribir documentos.
Nota | Dentro de este encabezado de 4 bytes también reservamos algunos bits para una serie de operaciones internas, como el bloqueo, para hacer que la cola crónica sea segura de los hilos en procesadores y hilos. Lo importante a tener en cuenta es que debido a esto, no puede convertir estrictamente los 4 bytes en un entero para encontrar la longitud de su blob de datos. |
Como se indicó anteriormente, la cola de Chronicle usa un apéndice para escribir en la cola y un tailer para leer desde la cola. A diferencia de otras soluciones de cola Java, los mensajes no se pierden cuando se leen con un tailer. Esto se cubre con más detalle en la sección a continuación sobre "Lectura de una cola usando un tailer". Para escribir datos en una cola de Chronicle, primero debe crear un apéndice:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
La cola de Chronicle utiliza la siguiente interfaz de bajo nivel para escribir los datos:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
El cierre en los intentos de prueba es el punto en que la longitud de los datos se escribe en el encabezado. También puede usar el DocumentContext
para averiguar el índice de que sus datos acaban de asignarse (ver más abajo). Más tarde puede usar este índice para avanzar/buscar este extracto. Cada extracto de la cola crónica tiene un índice único.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
Los métodos de alto nivel a continuación, como writeText()
son métodos de conveniencia para llamar appender.writingDocument()
, pero ambos enfoques esencialmente hacen lo mismo. El código real de writeText(CharSequence text)
se ve así:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
Por lo tanto, tiene la opción de una serie de interfaces de alto nivel, hasta una API de bajo nivel, a la memoria sin procesar.
Esta es la API de más alto nivel que oculta el hecho de que está escribiendo para mensajes. El beneficio es que puede intercambiar llamadas a la interfaz con un componente real o una interfaz a un protocolo diferente.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
Puede escribir un "mensaje de autodescripción". Dichos mensajes pueden admitir cambios de esquema. También son más fáciles de entender al depurar o diagnosticar problemas.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
Puede escribir "Datos sin procesar" que se autodescribe. Los tipos siempre serán correctos; La posición es la única indicación en cuanto al significado de esos valores.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
Puede escribir "datos sin procesar" que no se autodescribe. Su lector debe saber qué significa estos datos y los tipos que se utilizaron.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
A continuación, se ilustra la forma de nivel más bajo de escribir datos. Obtiene una dirección a la memoria sin procesar y puede escribir lo que desee.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
Puede imprimir el contenido de la cola. Puede ver los dos primeros, y los últimos dos mensajes almacenan los mismos datos.
// dump the content of the queue
System . out . println ( queue . dump ());
huellas dactilares:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
Leer la cola sigue el mismo patrón que la escritura, excepto que existe la posibilidad de que no haya un mensaje cuando intentes leerlo.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
Puede convertir cada mensaje en una llamada de método en función del contenido del mensaje y hacer que la cola de Chronicle deserialice automáticamente los argumentos del método. Llamar reader.readOne()
saltará automáticamente (filtrar) cualquier mensaje que no coincida con su lector de métodos.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
Puede decodificar el mensaje usted mismo.
Nota | Los nombres, el tipo y el orden de los campos no tienen que coincidir. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
Puede leer los valores de datos autodescritos. Esto verificará los tipos son correctos y se convertirán según sea necesario.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
Puede leer datos sin procesar como primitivas y cuerdas.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
O bien, puede obtener la dirección de memoria subyacente y acceder a la memoria nativa.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
Nota | Cada tailer ve cada mensaje. |
Se puede agregar una abstracción para filtrar mensajes o asignar mensajes a un solo procesador de mensajes. Sin embargo, en general, solo necesita un tailer principal para un tema, con posiblemente, algunos tasas de apoyo para monitorear, etc.
Como Chronicle Queue no divide sus temas, obtienes pedidos totales de todos los mensajes dentro de ese tema. En todos los temas, no hay garantía de ordenar; Si desea reproducir deterministas de un sistema que se consume de múltiples temas, sugerimos reproducir desde la salida de ese sistema.
Los tasas de cola Chronicle pueden crear manejadores de archivos, los manejadores de archivos se limpian cada vez que se invoca el método close()
de la cola de Chronicle Association o cuando el JVM ejecuta una recolección de basura. Si está escribiendo su código, no tiene pausas GC y desea limpiar explícitamente los manejadores de archivos, puede llamar a lo siguiente:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
En algunas aplicaciones, puede ser necesario comenzar a leer desde el final de la cola (por ejemplo, en un escenario de reinicio). Para este caso de uso, ExcerptTailer
proporciona el método toEnd()
. Cuando la dirección del tailer está FORWARD
(de forma predeterminada, o según lo establecido por el método ExcerptTailer.direction
), entonces llamar toEnd()
colocará el Tailer justo después del último registro existente en la cola. En este caso, el Tailer ahora está listo para leer cualquier registro nuevo adjunto a la cola. Hasta que se agregue cualquier mensaje nuevo a la cola, no habrá un nuevo DocumentContext
disponible para leer:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
Si es necesario leer hacia atrás a través de la cola desde el final, entonces el Tailer se puede configurar para leer hacia atrás:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
Al leer al revés, el método toEnd()
moverá el tailer al último registro en la cola. Si la cola no está vacía, entonces habrá un DocumentContext
disponible para leer:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
AKA llamado Tailers.
Puede ser útil tener un tailer que continúe desde donde estaba hasta el reinicio de la aplicación.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "A" Last Reads Mensaje 2
Tailer "A" Next Reads Mensaje 3
Tailer "B" Last lee el mensaje 0
Tailer "B" Next lee el mensaje 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
Nota | The direction() is not preserved across restarts, only the next index to be read. |
Nota | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
Consejo | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
Nota | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; nombre de cadena privada; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
o
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Nota | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
Advertencia | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
Nota | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
Nota | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
Nota | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
Resultados:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -------------------------------------------------- -------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -------------------------------------------------- -------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -------------------------------------------------- -------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 -------------------------------------------------- -------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -------------------------------------------------- -------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
Resultados:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.