https://player.vimeo.com/video/201989439
A fila do Chronicle é uma estrutura de mensagens de baixa latência persistida para aplicações de alto desempenho.
Este projeto cobre a versão Java da fila Chronicle. Uma versão C ++ deste projeto também está disponível e suporta a interoperabilidade Java/C ++, além de ligações adicionais de idiomas, por exemplo, Python. Se você estiver interessado em avaliar a versão C ++, entre em contato com [email protected].
À primeira vista, a fila da crônica pode ser vista como simplesmente outra implementação da fila . No entanto, possui grandes opções de design que devem ser enfatizadas. Usando o armazenamento fora da heap , a fila Chronicle fornece um ambiente em que os aplicativos não sofrem de coleta de lixo (GC). Ao implementar aplicativos de alto desempenho e uso intensivo de memória (você ouviu o termo sofisticado "bigdata"?) Em Java, um dos maiores problemas é a coleta de lixo.
A fila do Chronicle permite que as mensagens sejam adicionadas ao final de uma fila ("anexadas"), lida na fila ("cauda") e também suporta busca de acesso aleatório.
Você pode considerar uma fila de crônica semelhante a um tópico durável/persistente sem corretores de baixa latência que pode conter mensagens de diferentes tipos e tamanhos. A fila de Chronicle é uma fila persistida e ilimitada distribuída que:
Suporta o RMI assíncrono e publique/inscreva interfaces com latências de microssegundos.
passa mensagens entre JVMs em um microssegundo
passa mensagens entre JVMs em diferentes máquinas por replicação em menos de 10 microssegundos (recurso corporativo)
Fornece latências estáveis e em tempo real, em milhões de mensagens por segundo, para um único thread para uma fila; com a ordem total de cada evento.
Ao publicar mensagens de 40 bytes, uma alta porcentagem do tempo alcançamos latências em 1 microssegundo. A latência do 99º percentil é a pior 1 em 100, e o percentil 99,9 é o pior 1 em 1000 latência.
Tamanho do lote | 10 milhões de eventos por minuto | 60 milhões de eventos por minuto | 100 milhões de eventos por minuto |
---|---|---|---|
99%ILE | 0,78 µs | 0,78 µs | 1,2 µs |
99,9%ILE | 1,2 µs | 1,3 µs | 1,5 µs |
Tamanho do lote | 10 milhões de eventos por minuto | 60 milhões de eventos por minuto | 100 milhões de eventos por minuto |
---|---|---|---|
99%ILE | 20 µs | 28 µs | 176 µs |
99,9%ILE | 901 µs | 705 µs | 5.370 µs |
Observação | 100 milhões de eventos por minuto estão enviando um evento a cada 660 nanossegundos; replicado e persistiu. |
Importante | Esse desempenho não é alcançado usando um grande conjunto de máquinas . Isso está usando um thread para publicar e um thread para consumir. |
A fila do Chronicle foi projetada para:
Seja um "Register Everything Store", que pode ler com a latência em tempo real da microssegunda. Isso suporta até os sistemas de negociação de alta frequência mais exigentes. No entanto, ele pode ser usado em qualquer aplicação em que a gravação de informações seja uma preocupação.
Apoie a replicação confiável com notificação ao Appender (Writer of Message) ou a um tailer (leitor da mensagem), quando uma mensagem foi replicada com sucesso.
A fila do Chronicle assume que o espaço em disco é barato em comparação com a memória. A fila do Chronicle faz uso total do espaço em disco que você possui e, portanto, você não está limitado pela memória principal da sua máquina. Se você usar o HDD giratório, poderá armazenar muitos TBs de espaço em disco por pouco custo.
O único software extra que a fila de Chronicle precisa executar é o sistema operacional. Não tem um corretor; Em vez disso, ele usa seu sistema operacional para fazer todo o trabalho. Se o seu aplicativo morrer, o sistema operacional continuará em execução por segundos por mais tempo, para que nenhum dado seja perdido; mesmo sem replicação.
Como o Chronicle Fileue armazena todos os dados salvos em arquivos mapeados de memória, isso possui uma sobrecarga trivial na heap, mesmo se você tiver mais de 100 TB de dados.
Chronicle fez um esforço significativo para alcançar uma latência muito baixa. Em outros produtos que se concentram no suporte a aplicativos da Web, as latências inferiores a 40 milissegundos são boas, pois são mais rápidas do que você pode ver; Por exemplo, a taxa de quadros de cinema é de 24 Hz, ou cerca de 40 ms.
A fila do Chronicle visa alcançar latências com menos de 40 microssegundos por 99% a 99,99% do tempo. Usando a fila Chronicle sem replicação, apoiamos aplicativos com latências abaixo de 40 microssegundos de ponta a ponta em vários serviços. Freqüentemente, a latência de 99% da fila de crônica depende inteiramente da escolha do sistema operacional e do subsistema de disco rígido.
A replicação para a fila de crônica suporta o Chronicle Wire Enterprise. Isso suporta uma compactação em tempo real que calcula os deltas para objetos individuais, como eles são escritos. Isso pode reduzir o tamanho das mensagens em um fator de 10 ou melhor, sem a necessidade de lote; isto é, sem introduzir latência significativa.
A fila do Chronicle também suporta compressão LZW, Snappy e Gzip. Esses formatos, no entanto, adicionam latência significativa. Isso só é útil se você tiver limitações estritas na largura de banda da rede.
A fila do Chronicle suporta uma série de semânticas:
Cada mensagem é repetida na reinicialização.
Apenas novas mensagens são reproduzidas no reinício.
Reinicie de qualquer ponto conhecido usando o índice da entrada.
Repita apenas as mensagens que você perdeu. Isso é suportado diretamente usando os construtores MethodReader/MethodWriter.
Na maioria dos sistemas System.nanoTime()
é aproximadamente o número de nanossegundos desde o último sistema (embora JVMs diferentes possam se comportar de maneira diferente). Isso é o mesmo entre JVMs na mesma máquina, mas muito diferente entre as máquinas. A diferença absoluta quando se trata de máquinas não tem sentido. No entanto, as informações podem ser usadas para detectar outliers; Você não pode determinar qual é a melhor latência, mas pode determinar a distância das melhores latências que você está. Isso é útil se você estiver se concentrando nas latências do 99º percentil. Temos uma classe chamada RunningMinimum
para obter horários de diferentes máquinas, enquanto compensa uma deriva no nanoTime
entre as máquinas. Quanto mais você faz medições, mais preciso é esse mínimo em execução.
A fila do Chronicle gerencia o armazenamento por ciclo. Você pode adicionar um StoreFileListener
que o notificará quando um arquivo for adicionado e quando não for mais retido. Você pode mover, comprimir ou excluir todas as mensagens por um dia, de uma só vez. Nota: Infelizmente no Windows, se uma operação de IO for interrompida, ela poderá fechar o FileChannel subjacente.
Por razões de desempenho, removemos a verificação de interrupções no código da fila Chronicle. Por esse motivo, recomendamos que você evite usar a fila Chronicle com código que gera interrupções. Se você não pode evitar a geração de interrupções, sugerimos que você crie uma instância separada da fila de crônica por thread.
A fila do Chronicle é mais frequentemente usada para sistemas centrados no produtor, onde você precisa reter muitos dados por dias ou anos. Para estatísticas, consulte o uso de crônica
Importante | A fila Chronicle não suporta operar nenhum sistema de arquivos de rede, seja NFS, AFS, armazenamento baseado em SAN ou qualquer outra coisa. O motivo disso é que esses sistemas de arquivos não fornecem todos os primitivos necessários para os usos de fila de arquivos mapeados por memória. Se for necessária qualquer rede (por exemplo, para tornar os dados acessíveis a vários hosts), a única maneira suportada é a replicação da fila do Chronicle (recurso corporativo). |
A maioria dos sistemas de mensagens é centrada no consumidor. O controle de fluxo é implementado para evitar que o consumidor seja sobrecarregado; até momentaneamente. Um exemplo comum é um servidor que suporta vários usuários da GUI. Esses usuários podem estar em diferentes máquinas (SO e hardware), diferentes qualidades da rede (latência e largura de banda), fazendo uma variedade de outras coisas em momentos diferentes. Por esse motivo, faz sentido para o consumidor do cliente dizer ao produtor quando recuar, adiando todos os dados até que o consumidor esteja pronto para obter mais dados.
A fila de Chronicle é uma solução centrada no produtor e faz todo o possível para nunca empurrar o produtor ou pedir para diminuir a velocidade. Isso o torna uma ferramenta poderosa, fornecendo um grande buffer entre o seu sistema e um produtor a montante sobre o qual você tem pouco, ou não, controle.
Os editores de dados de mercado não oferecem a opção de adiar o produtor por muito tempo; Se é que existe. Alguns de nossos usuários consomem dados da CME Opra. Isso produz picos de 10 milhões de eventos por minuto, enviados como pacotes UDP sem nenhuma tentativa. Se você perder ou soltar um pacote, ele está perdido. Você precisa consumir e gravar esses pacotes tão rápido quanto eles chegam até você, com muito pouco buffer no adaptador de rede. Para dados de mercado em particular, o tempo real significa em alguns microssegundos ; Isso não significa intra-dia (durante o dia).
A fila de crônica é rápida e eficiente e tem sido usada para aumentar a velocidade em que os dados são passados entre os threads. Além disso, ele também mantém um registro de cada mensagem passada, permitindo que você reduza significativamente a quantidade de registro que você precisa fazer.
Os sistemas de conformidade são exigidos por mais e mais sistemas atualmente. Todo mundo precisa tê -los, mas ninguém quer ser desacelerado por eles. Ao usar a fila do Chronicle para buffer os dados entre os sistemas monitorados e o sistema de conformidade, você não precisa se preocupar com o impacto da gravação de conformidade nos seus sistemas monitorados. Novamente, a fila Chronicle pode suportar milhões de eventos por segundo, por servidor e acessar dados que são mantidos há anos.
A fila de crônica suporta IPC de baixa latência (comunicação entre processos) entre JVMs na mesma máquina na ordem de magnitude de 1 microssegundo; bem como entre máquinas com uma latência típica de 10 microssegundos para taxas de transferência modestas de algumas centenas de milhares. A fila do Chronicle suporta taxas de transferência de milhões de eventos por segundo, com latências estáveis de microssegundos.
Veja artigos sobre o uso da fila de crônica em microsserviços
Uma fila de crônica pode ser usada para construir máquinas de estado. Todas as informações sobre o estado desses componentes podem ser reproduzidas externamente, sem acesso direto aos componentes ou ao seu estado. Isso reduz significativamente a necessidade de registro adicional. No entanto, qualquer log que você precisa pode ser registrado em grandes detalhes. Isso torna prático o login de depuração de DEBUG
. Isso ocorre porque o custo da extração é muito baixo; menos de 10 microssegundos. Os logs podem ser replicados centralmente para consolidação de log. A fila do Chronicle está sendo usada para armazenar mais de 100 TB de dados, que podem ser repetidos a partir de qualquer momento.
Os componentes de streaming não lotes são altamente executivos, determinísticos e reproduzíveis. Você pode reproduzir bugs que aparecem apenas após um milhão de eventos realizados em uma ordem específica, com horários realistas acelerados. Isso torna o uso de processamento de fluxo atraente para sistemas que precisam de um alto grau de resultados de qualidade.
Os lançamentos estão disponíveis no Maven Central como:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
Consulte Notas de liberação da fila do Chronicle e obtenha o número mais recente da versão. Instantâneos estão disponíveis em https://oss.sonatype.org
Observação | As classes que residem nos pacotes 'internos', 'implic' e 'main' (este último contendo vários métodos principais executáveis) e quaisquer sub-pacotes não fazem parte da API pública e podem se sujeitar a alterações em qualquer Hora de qualquer motivo . Consulte os respectivos arquivos package-info.java para obter detalhes. |
Na fila Chronicle V5 Tailers agora são somente leitura, na fila Chronicle V4, tivemos o conceito de indexação preguiçosa, onde os anexos não escreveriam índices, mas a indexação poderia ser feita pelo tailer. Decidimos soltar a indexação preguiçosa em V5; Fazer com que os Tailers somente leitura não apenas simplificem a fila de crônica, mas também nos permite adicionar otimizações em outras partes do código.
O modelo de travamento da fila de crônica foi alterado no V5, na fila Chronicle v4, o bloqueio de gravação (para evitar gravações simultâneas na fila) existe no arquivo .cq4. No V5, isso foi movido para um único arquivo chamado Store de Table (metadados.cq4t). Isso simplifica o código de bloqueio internamente, pois apenas o arquivo de armazenamento de tabela deve ser inspecionado.
Você pode usar o Chronicle Queue V5 para ler mensagens escritas com a fila Chronicle V4, mas isso não é garantido para sempre funcionar - se, por exemplo, você criou sua fila V4 com wireType(WireType.FIELDLESS_BINARY)
, o Chronicle fileee V5 não será capaz de Leia o cabeçalho da fila. Temos alguns testes para filas de leitura V5 V4, mas essas são limitadas e todos os cenários podem não ser suportados.
Você não pode usar o Chronicle Queue V5 para escrever para o Chronicle Fileue V4 filas.
A fila de Chronicle V4 é uma reescrita completa da fila de crônica que resolve os seguintes problemas que existiam na V3.
Sem mensagens de auto-descrição, os usuários tiveram que criar sua própria funcionalidade para despejar mensagens e armazenamento de dados a longo prazo. Com V4, você não precisa fazer isso, mas pode se desejar.
A fila de baunilha Chronicle criaria um arquivo por thread. Isso é bom se o número de encadeamentos for controlado, no entanto, muitos aplicativos têm pouco ou nenhum controle sobre quantos threads são usados e isso causou problemas de usabilidade.
A configuração para a Chronicle Indexada e Vanilla estava inteiramente em código, então o leitor tinha que ter a mesma configuração que os escritores e nem sempre estava claro o que era.
Não havia como o produtor saber quantos dados foram replicados para a segunda máquina. A única solução alternativa era replicar os dados de volta aos produtores.
Você precisava especificar o tamanho dos dados para reservar antes de começar a escrever sua mensagem.
Você precisava fazer seu próprio bloqueio para o Appender ao usar o Chronicle Indexado.
Na fila Chronicle V3, tudo estava em termos de bytes, não de arame. Existem duas maneiras de usar o byte na fila Chronicle V4. Você pode usar os métodos writeBytes
e readBytes
, ou pode obter os bytes()
do fio. Por exemplo:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
O Chronicle Queue Enterprise Edition é uma versão comercialmente suportada de nossa fila de crônica de código aberto bem -sucedido. A documentação de código aberto é estendido pelos seguintes documentos para descrever os recursos adicionais disponíveis quando você estiver licenciado para a Enterprise Edition. Estes são:
Criptografia de filas e mensagens de mensagens. Para mais informações, consulte a documentação de criptografia.
Replicação TCP/IP (e Opcionalmente UDP) entre hosts para garantir o backup em tempo real de todos os dados da sua fila. Para obter mais informações, consulte a documentação da replicação, o protocolo de replicação da fila é abordado no protocolo de replicação.
Suporte do fuso horário para a programação diária de rolagem da fila. Para obter mais informações, consulte o suporte do fuso horário.
Suporte ao modo assíncrono para fornecer desempenho aprimorado em alta taxa de transferência em sistemas de arquivos mais lentos. Para obter mais informações, consulte o modo assíncrono e também desempenho.
Pré-toucher para melhorar os outliers, consulte o pré-toucher e sua configuração
Além disso, você será totalmente suportado por nossos especialistas técnicos.
Para obter mais informações sobre o Chronicle Queue Enterprise Edition, entre em contato com [email protected].
Uma fila de crônica é definida pela SingleChronicleQueue.class
, projetada para apoiar:
rolando arquivos diariamente, semanalmente ou a cada hora,
escritores simultâneos na mesma máquina,
Leitores simultâneos na mesma máquina ou em várias máquinas por meio de replicação TCP (com o Chronicle Queue Enterprise),
Leitores e escritores simultâneos entre o Docker ou outras cargas de trabalho em contêineres
zero cópia serialização e deserialização,
Milhões de gravações/leituras por segundo em hardware de commodities.
Aproximadamente 5 milhões de mensagens/segundo para mensagens de 96 bytes em um processador i7-4790. Uma estrutura de diretório de filas é a seguinte:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
O formato consiste em bytes preparados por tamanho que são formatados usando BinaryWire
ou TextWire
. A fila do Chronicle foi projetada para ser dirigida a partir do código. Você pode adicionar facilmente uma interface que atenda às suas necessidades.
Observação | Devido à operação de nível bastante baixo, as operações de leitura/gravação da fila do Chronicle podem lançar exceções desmarcadas. Para evitar a morte de roscas, pode ser prático pegar RuntimeExceptions e registrá -los/analisá -los conforme apropriado. |
Observação | Para demonstrações de como a fila de crônica pode ser usada, consulte a demonstração da fila do Chronicle e para a documentação do Java, consulte o Chronicle Queue Javadocs |
Nas seções a seguir, primeiro introduzimos alguma terminologia e uma referência rápida para usar a fila Chronicle. Em seguida, fornecemos um guia mais detalhado.
A fila Chronicle é um diário persistido de mensagens que suporta escritores e leitores simultâneos, mesmo em vários JVMs na mesma máquina. Todo leitor vê todas as mensagens, e um leitor pode participar a qualquer momento e ainda ver todas as mensagens.
Observação | Evitamos deliberadamente o termo consumidor e, em vez disso, usamos o leitor , pois as mensagens não são consumidas/destruídas pela leitura. |
A fila de Chronicle tem os seguintes conceitos principais:
Trecho
Trecho é o principal contêiner de dados em uma fila de crônica. Em outras palavras, cada fila de crônica é composta por trechos. Escrever mensagem para uma fila de crônica significa iniciar um novo trecho, escrever uma mensagem nela e terminar o trecho no final.
Appender
Um appnder é a fonte de mensagens; algo como um iterador no ambiente de crônica. Você adiciona dados que anexam a fila de crônica atual. Ele pode executar gravações sequenciais anexando apenas ao final da fila. Não há como inserir ou excluir trechos.
Tailer
Um Tailer é um leitor de trecho otimizado para leituras seqüenciais. Ele pode executar leituras seqüenciais e aleatórias, para frente e para trás. Os Tailers leem a próxima mensagem disponível cada vez que são chamados. Os seguintes são garantidos na fila Chronicle:
Para cada Appender , as mensagens são escritas na ordem em que o Appender as escreveu. Mensagens de diferentes apêndeiros são intercaladas,
Para cada tailer , ele verá todas as mensagens para um tópico na mesma ordem que qualquer outro traseiro,
Quando replicado, cada réplica tem uma cópia de cada mensagem.
A fila de Chronicle não tem corretor. Se você precisar de uma arquitetura com um corretor, entre em contato com [email protected].
Arquivos de rolamento e fila de arquivos
A fila do Chronicle foi projetada para rolar seus arquivos, dependendo do ciclo de rolo escolhido quando a fila é criada (consulte Rollcycles). Em outras palavras, um arquivo de fila é criado para cada ciclo de rolo que possui extensão cq4
. Quando o ciclo do rolo atingir o ponto em que deve rolar, o Appender escreverá atomicamente a marca EOF
no final do arquivo atual para indicar que nenhum outro Appender deve gravar nesse arquivo e nenhum tailer deve ler mais e, em vez disso, todos devem usar um novo arquivo.
Se o processo foi desligado e reiniciado posteriormente, quando o ciclo de rolagem deve usar um novo arquivo, um Appender tentará localizar arquivos antigos e escrever uma marca EOF
neles para ajudar os Tailers a lê -los.
Tópicos
Cada tópico é um diretório de arquivos de fila. Se você tem um tópico chamado mytopic
, o layout pode ficar assim:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
Para copiar todos os dados para um único dia (ou ciclo), você pode copiar o arquivo para esse dia para sua máquina de desenvolvimento para testes de repetição.
Restrições sobre tópicos e mensagens
Os tópicos estão limitados a serem strings que podem ser usados como nomes de diretórios. Dentro de um tópico, você pode ter subtópicos que podem ser qualquer tipo de dados que possa ser serializado. As mensagens podem ser quaisquer dados serializáveis.
Fila de Crônicos suporta:
Objetos Serializable
, embora isso seja evitado, pois não é eficiente
Objetos Externalizable
são preferidos se você deseja usar APIs Java padrão.
byte[]
e String
Marshallable
; Uma mensagem auto -descrevendo que pode ser escrita como Yaml, Yaml binária ou JSON.
BytesMarshallable
, que é binário de baixo nível ou codificação de texto.
Esta seção fornece uma referência rápida para o uso da fila Chronicle para mostrar brevemente como criar, escrever/ler em/de uma fila.
Construção da fila de crônica
Criar uma instância da fila do Chronicle é diferente de chamar um construtor. Para criar uma instância, você deve usar o ChronicleQueueBuilder
.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
Neste exemplo, criamos um IndexedChronicle
que cria dois RandomAccessFiles
; um para índices e outro para dados com nomes relativamente:
${java.io.tmpdir}/getting-started/{today}.cq4
Escrevendo para uma fila
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
Lendo de uma fila
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
Além disso, o método ChronicleQueue.dump()
pode ser usado para despejar o conteúdo bruto como uma string.
queue . dump ();
Limpar
O Chronicle Fileue armazena seus dados fora da heap, e é recomendável que você ligue close()
depois de terminar de trabalhar com a fila Chronicle, para obter recursos gratuitos.
Observação | Nenhum dado será perdido se você fizer isso. Isso é apenas para limpar os recursos que foram usados. |
queue . close ();
Juntando tudo
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
Você pode configurar uma fila de crônica usando seus parâmetros de configuração ou propriedades do sistema. Além disso, existem diferentes maneiras de escrever/ler em/de uma fila, como o uso de proxies e usar MethodReader
e MethodWriter
.
A fila de Chronicle (CQ) pode ser configurada por meio de vários métodos na classe SingleChronicleQueueBuilder
. Alguns dos parâmetros mais consultados por nossos clientes são explicados abaixo.
Rollcycle
O parâmetro RollCycle
configura a taxa na qual o CQ rolaria os arquivos da fila subjacente. Por exemplo, o uso do snippet de código a seguir resultará na lição dos arquivos da fila (ou seja, um novo arquivo criado) a cada hora:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
Depois que o ciclo de rolagem de uma fila foi definido, ele não pode ser alterado posteriormente. Quaisquer outras instâncias de SingleChronicleQueue
configuradas para usar o mesmo caminho devem ser configuradas para usar o mesmo ciclo de rolagem e, se não forem, o ciclo de rolagem será atualizado para corresponder ao ciclo de rolagem persistente. Nesse caso, uma mensagem de log de aviso será impressa para notificar o usuário da biblioteca da situação:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
Saída do console:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
O número máximo de mensagens que podem ser armazenadas em um arquivo de fila depende do ciclo de rolo. Veja as perguntas frequentes para obter mais informações sobre isso.
Na fila Chronicle, o tempo de rollover é baseado no UTC. O recurso Enterprise de rolagem do fuso horário estende a capacidade da fila do Chronicle de especificar o tempo e a periodicidade das rollovers da fila, em vez de UTC. Para mais informações, consulte a rolagem da fila do fuso horário.
A classe FileUtil
da fila Chronicle fornece métodos úteis para gerenciar arquivos de fila. Consulte Gerenciando arquivos de rolo diretamente.
Tenetype
É possível configurar como o Chronicle Fileue armazenará os dados definindo explicitamente o WireType
:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
Por exemplo:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
Embora seja possível fornecer explicitamente o FYRETYPE ao criar um construtor, ele é desencorajado, pois nem todos os tipos de fios são suportados pela fila Chronicle ainda. Em particular, os seguintes tipos de fio não são suportados:
Texto (e essencialmente tudo baseado no texto, incluindo JSON e CSV)
CRU
Read_any
Blocksize
Quando uma fila é lida/escrita, parte do arquivo atualmente lida/escrita é mapeada para um segmento de memória. Este parâmetro controla o tamanho do bloco de mapeamento de memória. Você pode alterar esse parâmetro usando o método SingleChronicleQueueBuilder.blockSize(long blockSize)
se for necessário.
Observação | Você deve evitar a mudança blockSize desnecessariamente. |
Se você estiver enviando mensagens grandes, defina um grande blockSize
ou seja, o blockSize
deve ser pelo menos quatro vezes o tamanho da mensagem.
Aviso | Se você usar pequeno blockSize para grandes mensagens, recebe uma IllegalStateException e a gravação será abortada. |
Recomendamos que você use o mesmo blockSize
para cada instância da fila ao replicar as filas, o blockSize
não é escrito nos metadados da fila; portanto, idealmente, deve ser definido para o mesmo valor ao criar suas instâncias de fila de crônica (isso é recomendado, mas se você desejar Para executar com um blocksize
diferente, você pode).
Dica | Use o mesmo blockSize para cada instância de filas replicadas. |
Indexspacacing
Este parâmetro mostra o espaço entre trechos explicitamente indexados. Um número mais alto significa maior desempenho de gravação seqüencial, mas um acesso aleatório mais lento. O desempenho de leitura seqüencial não é afetado por esta propriedade. Por exemplo, o seguinte espaçamento de índice padrão pode ser retornado:
16 (minuciosamente)
64 (diariamente)
Você pode alterar esse parâmetro usando o método SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
.
IndexCount
O tamanho de cada matriz de índice, bem como o número total de matrizes de índice por arquivo da fila.
Observação | IndexCount 2 é o número máximo de entradas de fila indexada. |
Observação | Consulte a seção Extração Indexação na fila Crônica deste Guia do Usuário para obter mais informações e exemplos de uso de índices. |
ReadBuffermode, WriteBuffermode
Esses parâmetros definem buffermode para leituras ou gravações que possuem as seguintes opções:
None
- o padrão (e o único disponível para usuários de código aberto), sem buffer;
Copy
- usada em conjunto com a criptografia;
Asynchronous
- use um buffer assíncrono ao ler e/ou escrever, fornecido pelo modo de crônica assíncrona.
BufferCapacity
Capacidade do RingBuffer em bytes ao usar bufferMode: Asynchronous
Na fila Chronicle, nos referimos ao ato de escrever seus dados na fila do Chronicle, como armazenando um trecho. Esses dados podem ser compensados de qualquer tipo de dados, incluindo texto, números ou blobs serializados. Por fim, todos os seus dados, independentemente do que são, são armazenados como uma série de bytes.
Pouco antes de armazenar seu trecho, o Chronicle Fileue reserva-se um cabeçalho de 4 bytes. A fila do Chronicle escreve o comprimento dos seus dados neste cabeçalho. Dessa forma, quando a fila do Chronicle chega a ler seu trecho, ele sabe quanto tempo cada bolhas de dados dura. Nós nos referimos a este cabeçalho de 4 bytes, juntamente com o seu trecho, como um documento. A fila de crônica estritamente falando pode ser usada para ler e escrever documentos.
Observação | Dentro deste cabeçalho de 4 bytes, também reservamos alguns bits para várias operações internas, como o bloqueio, para fazer com que a fila da fila crônica segura os processadores e os threads. O importante a ser observado é que, por isso, você não pode converter estritamente os 4 bytes em um número inteiro para encontrar o comprimento do seu blob de dados. |
Como afirmado anteriormente, a fila Chronicle usa um Appender para escrever na fila e um traseiro para ler na fila. Ao contrário de outras soluções de fila de Java, as mensagens não são perdidas quando são lidas com um tailer. Isso é abordado com mais detalhes na seção abaixo sobre "Reading de uma fila usando um tailer". Para escrever dados em uma fila de crônica, você deve primeiro criar um Appender:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
A fila do Chronicle usa a seguinte interface de baixo nível para escrever os dados:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
O fechamento no Try-With-RESOURCES, é o ponto em que o comprimento dos dados é gravado no cabeçalho. Você também pode usar o DocumentContext
para descobrir o índice que seus dados acabaram de ser atribuídos (veja abaixo). Mais tarde, você pode usar esse índice para mudar/procurar este trecho. Cada trecho da fila da crônica tem um índice exclusivo.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
Os métodos de alto nível abaixo, como writeText()
são métodos de conveniência para chamar appender.writingDocument()
, mas ambas abordagens fazem essencialmente a mesma coisa. O código real de writeText(CharSequence text)
se parece com o seguinte:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
Portanto, você pode escolher várias interfaces de alto nível, até uma API de baixo nível, para a memória bruta.
Esta é a API de nível mais alto que esconde o fato de você estar escrevendo para enviar mensagens. O benefício é que você pode trocar chamadas para a interface com um componente real ou uma interface para um protocolo diferente.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
Você pode escrever uma "mensagem auto-descrita". Tais mensagens podem suportar alterações no esquema. Eles também são mais fáceis de entender ao depurar ou diagnosticar problemas.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
Você pode escrever "dados brutos", que é auto-descrevendo. Os tipos sempre estarão corretos; A posição é a única indicação quanto ao significado desses valores.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
Você pode escrever "dados brutos", que não são auto-descrevendo. Seu leitor deve saber o que esses dados significam e os tipos que foram usados.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
Abaixo, a maneira mais baixa de gravar dados é ilustrada. Você obtém um endereço para a memória bruta e pode escrever o que quiser.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
Você pode imprimir o conteúdo da fila. Você pode ver as duas primeiras e as duas mensagens armazenam os mesmos dados.
// dump the content of the queue
System . out . println ( queue . dump ());
impressões:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
A leitura da fila segue o mesmo padrão que a escrita, exceto que existe a possibilidade de não haver uma mensagem quando você tenta lê -lo.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
Você pode transformar cada mensagem em uma chamada de método com base no conteúdo da mensagem e fazer com que a fila crônica desserialize automaticamente os argumentos do método. Calling reader.readOne()
irá pular automaticamente (filtrar) todas as mensagens que não correspondam ao seu leitor de método.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
Você pode decodificar a mensagem você mesmo.
Observação | Os nomes, tipos e ordem dos campos não precisam corresponder. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
Você pode ler valores de dados de auto-descrição. Isso verificará os tipos está correto e converter conforme necessário.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
Você pode ler dados brutos como primitivas e strings.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
Ou você pode obter o endereço de memória subjacente e acessar a memória nativa.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
Observação | Todo Tailer vê todas as mensagens. |
Uma abstração pode ser adicionada para filtrar mensagens ou atribuir mensagens a apenas um processador de mensagens. No entanto, em geral, você só precisa de um tailer principal para um tópico, com possivelmente, alguns Tailers de apoio para monitorar etc.
Como a fila do Chronicle não participa de seus tópicos, você recebe pedidos totais de todas as mensagens nesse tópico. Nos tópicos, não há garantia de pedidos; Se você deseja reproduzir deterministicamente um sistema que consome de vários tópicos, sugerimos a repetição da saída desse sistema.
Os Tailers da fila do Chronicle podem criar manipuladores de arquivos, os manipuladores de arquivos são limpos sempre que o método close()
da fila da Chronicle associado é invocada ou sempre que a JVM executa uma coleção de lixo. Se você estiver escrevendo seu código, não possui pausas no GC e deseja explicitamente limpar os manipuladores de arquivos, você pode ligar para o seguinte:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
Em algumas aplicações, pode ser necessário começar a ler a partir do final da fila (por exemplo, em um cenário de reinício). Para este caso de uso, ExcerptTailer
fornece o método toEnd()
. Quando a direção do Tailer é FORWARD
(por padrão , ou conforme definido pelo método de ExcerptTailer.direction
toEnd()
Nesse caso, o Tailer agora está pronto para ler quaisquer novos registros anexados à fila. Até que quaisquer novas mensagens sejam anexadas à fila, não haverá um novo DocumentContext
disponível para leitura:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
Se for necessário ler para trás através da fila do final, o traseiro poderá ser configurado para ler para trás:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
Ao ler para trás, o método toEnd()
moverá o tailer para o último registro na fila. Se a fila não estiver vazia, haverá um DocumentContext
disponível para leitura:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
Aka chamado Tailers.
Pode ser útil ter um tailer que continua de onde estava fazendo o reinício do aplicativo.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "A" Last Leia Mensagem 2
Tailer "A" Próximo lê a mensagem 3
Tailer "b" last reads message 0
Tailer "b" next reads message 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
Observação | The direction() is not preserved across restarts, only the next index to be read. |
Observação | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
Dica | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
Observação | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; private String name; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
ou
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Observação | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
Aviso | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
Observação | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
Observação | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
Observação | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
Resultados:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 -------------------------------------------------------- -------------------------------------------------------- ---------------- -------------------------------- SUMMARY (Concurrent2) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -------------------------------------------------------- -------------------------------------------------------- ----------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 -------------------------------------------------------- -------------------------------------------------------- ---------------- -------------------------------- SUMMARY (Concurrent2) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -------------------------------------------------------- -------------------------------------------------------- ----------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 -------------------------------------------------------- -------------------------------------------------------- ---------------- -------------------------------- SUMMARY (Concurrent2) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -------------------------------------------------------- -------------------------------------------------------- ----------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 -------------------------------------------------------- -------------------------------------------------------- ---------------- -------------------------------- SUMMARY (Concurrent2) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -------------------------------------------------------- -------------------------------------------------------- ----------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
Resultados:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.