https://player.vimeo.com/video/201989439
Chronicleキューは、高性能アプリケーション用の持続的な低遅延メッセージングフレームワークです。
このプロジェクトは、JavaバージョンのChronicle Queueをカバーしています。このプロジェクトのC ++バージョンも利用可能であり、Java/C ++相互運用性とPythonなどの追加の言語バインディングをサポートします。 C ++バージョンの評価に興味がある場合は、[email protected]にお問い合わせください。
一見すると、クロニクルキューは単に別のキューの実装と見なすことができます。ただし、強調すべき主要な設計の選択肢があります。クロニクルキューは、 HEAPオフストレージを使用して、アプリケーションがゴミ収集(GC)に悩まされない環境を提供します。 Javaで高性能およびメモリ集約型のアプリケーション(「Bigdata」という用語を聞いたことがありますか?)を実装するとき、最大の問題の1つはゴミコレクションです。
Chronicle Queueを使用すると、メッセージをキューの最後に追加し(「Appled」)、キューから読み取り(「テール」)、ランダムアクセスシークをサポートします。
クロニクルキューは、異なるタイプとサイズのメッセージを含むことができる低レイテンシブローカーのない耐久性/永続的なトピックに似ていると考えることができます。クロニクルキューは、分散されていない無制限の持続キューです。
非同期RMIをサポートし、マイクロ秒レイテンシを使用してインターフェイスを公開/購読します。
マイクロ秒以内にJVM間でメッセージを渡します
10マイクロ秒未満の複製を介して、異なるマシン上のJVM間のメッセージを渡す(エンタープライズ機能)
1つのキューに1つのスレッドに対して、1秒あたり数百万のメッセージに安定したソフトリアルタイムのレイテンシを提供します。すべてのイベントの合計注文。
40バイトのメッセージを公開するとき、1マイクロ秒未満のレイテンシを達成する時間の割合が高い。 99パーセンタイルのレイテンシーは100分の1で最悪の1であり、99.9パーセンタイルは1000のレイテンシで最悪の1です。
バッチサイズ | 1分あたり1,000万回のイベント | 1分あたり6,000万回のイベント | 1分あたり1億回のイベント |
---|---|---|---|
99%ILE | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%ILE | 1.2 µs | 1.3 µs | 1.5 µs |
バッチサイズ | 1分あたり1,000万回のイベント | 1分あたり6,000万回のイベント | 1分あたり1億回のイベント |
---|---|---|---|
99%ILE | 20 µs | 28 µs | 176 µs |
99.9%ILE | 901 µs | 705 µs | 5,370 µs |
注記 | 1分あたりの1億回のイベントは、660ナノ秒ごとにイベントを送信しています。複製されて持続しました。 |
重要 | このパフォーマンスは、マシンの大きなクラスターを使用して達成されません。これは、1つのスレッドを使用して公開し、1つのスレッドを使用して消費しています。 |
クロニクルキューは次のように設計されています。
マイクロ秒のリアルタイムレイテンシで読むことができる「すべてのストアを記録する」になります。これは、最も要求の厳しい高周波取引システムでさえもサポートします。ただし、情報の記録が懸念事項であるアプリケーションで使用できます。
メッセージが正常に複製されたときに、Appender(メッセージのライター)またはTailer(メッセージの読者)に通知を使用して信頼できる複製をサポートします。
クロニクルキューは、ディスクスペースがメモリと比較して安くなると仮定します。 Chronicle Queueは、あなたが持っているディスクスペースを最大限に活用するため、マシンのメインメモリに限定されません。スピニングHDDを使用する場合、ほとんどコストで多くのディスクスペースを保存できます。
クロニクルキューを実行する必要がある唯一の追加ソフトウェアは、オペレーティングシステムです。ブローカーはありません。代わりに、オペレーティングシステムを使用してすべての作業を行います。アプリケーションが死んだ場合、オペレーティングシステムは数秒長く実行され続けるため、データは失われません。複製がなくても。
Chronicleキューは、メモリマップされたファイルにすべて保存されたデータを保存しているため、100 TBを超えるデータがある場合でも、これには些細なオンヒープオーバーヘッドがあります。
クロニクルは、非常に低いレイテンシを達成することに多大な努力を払っています。 Webアプリケーションのサポートに焦点を当てた他の製品では、40ミリ秒未満のレイテンシが見えるよりも速いため、問題ありません。たとえば、映画のフレームレートは24 Hz、つまり約40ミリ秒です。
クロニクルキューは、99%から99.99%の時間で40マイクロ秒未満のレイテンシを達成することを目指しています。複製なしでクロニクルキューを使用して、複数のサービスにわたって40マイクロ秒未満のエンドツーエンド未満のレイテンシーを持つアプリケーションをサポートします。多くの場合、クロニクルキューの99%のレイテンシは、オペレーティングシステムとハードディスクサブシステムの選択に完全に依存しています。
Chronicleキュー用のレプリケーションは、Chronicle Wire Enterpriseをサポートしています。これは、個々のオブジェクトのデルタを計算するリアルタイム圧縮をサポートします。これにより、バッチを必要とせずに、メッセージのサイズを10倍以上削減できます。つまり、大きな遅延を導入することなくです。
Chronicleキューは、LZW、Snappy、およびGZIP圧縮もサポートしています。ただし、これらの形式は大きな遅延を追加します。これらは、ネットワーク帯域幅に厳格な制限がある場合にのみ便利です。
クロニクルキューは、多くのセマンティクスをサポートしています。
すべてのメッセージは再起動時に再生されます。
再起動時に新しいメッセージのみが再生されます。
エントリのインデックスを使用して、既知のポイントから再起動します。
見逃したメッセージのみを再生します。これは、MethodReader/MethodWriterビルダーを使用して直接サポートされます。
ほとんどのシステムでは、システムが最後に再起動されて以来、 System.nanoTime()
はほぼナノ秒数です(ただし、JVMが異なる場合があります)。これは、同じマシン上のJVM間で同じですが、マシン間で大きく異なります。マシンに関しては絶対的な違いは無意味です。ただし、情報を使用して外れ値を検出できます。最適なレイテンシーが何であるかを判断することはできませんが、あなたが最高のレイテンシからどれだけ離れているかを判断することができます。これは、99パーセンタイルのレイテンシに焦点を合わせている場合に役立ちます。 RunningMinimum
と呼ばれるクラスがあり、さまざまなマシンからタイミングを取得しながら、機械間のnanoTime
のドリフトを補正しています。測定を行う頻度が高いほど、このランニング最小はより正確です。
Chronicleキューはサイクルごとにストレージを管理します。ファイルが追加されたとき、およびそれがもはや保持されないときに通知するStoreFileListener
を追加できます。すべてのメッセージを一度に移動、圧縮、または削除することができます。注:残念ながら、Windowsでは、IO操作が中断された場合、基礎となるFileChannelを閉じることができます。
パフォーマンスの理由により、クロニクルキューコードの割り込みのチェックを削除しました。このため、割り込みを生成するコードでChronicleキューを使用しないようにすることをお勧めします。割り込みの生成を回避できない場合は、スレッドごとにクロニクルキューの個別のインスタンスを作成することをお勧めします。
クロニクルキューは、数日または数年の間、多くのデータを保持する必要がある生産者中心のシステムに最もよく使用されます。統計については、Chronicle-Queueの使用法を参照してください
重要 | Chronicleキューは、NFS、AFS、SANベースのストレージなど、ネットワークファイルシステムの操作をサポートしていません。この理由は、それらのファイルシステムが、クロニクルキューが使用するメモリマップファイルに必要なすべてのプリミティブを提供しないためです。ネットワーキングが必要な場合(たとえば、データに複数のホストがアクセスできるようにするため)、サポートされている唯一の方法はクロニクルキューレプリケーション(エンタープライズ機能)です。 |
ほとんどのメッセージングシステムは、消費者中心です。消費者が過負荷になるのを避けるために、フロー制御が実装されます。一瞬でも。一般的な例は、複数のGUIユーザーをサポートするサーバーです。これらのユーザーは、異なるマシン(OSとハードウェア)、さまざまなネットワークの品質(レイテンシと帯域幅)にあり、さまざまな時期に他のさまざまなことを行う可能性があります。このため、クライアントの消費者は、消費者がより多くのデータを取得する準備ができるまでデータを遅らせるときに、クライアントの消費者にプロデューサーに伝えることが理にかなっています。
Chronicle Queueはプロデューサー中心のソリューションであり、プロデューサーを押し戻さないように、または速度を落とすように命じないように可能な限りのことを行います。これにより、システムの間に大きなバッファーを提供する強力なツールと、コントロールがほとんどないか、いかなる上流のプロデューサーが提供されます。
市場データ出版社は、プロデューサーを長く押し戻すオプションを提供しません。もしあれば。一部のユーザーは、CME Opraからデータを消費します。これにより、1分あたり1,000万回のイベントのピークが生成され、再試行せずにUDPパケットとして送信されます。見逃したり、パケットを落としたりすると、失われます。ネットワークアダプターでのバッファリングはほとんどなく、これらのパケットを速度と同じくらい速く消費して記録する必要があります。特に市場データの場合、リアルタイムは数マイクロ秒で意味があります。日中(日中)という意味ではありません。
クロニクルキューは高速で効率的であり、スレッド間でデータが渡される速度を上げるために使用されています。さらに、渡されたすべてのメッセージの記録も保持され、必要なロギングの量を大幅に減らすことができます。
コンプライアンスシステムは、最近ではますます多くのシステムに必要です。誰もがそれらを持っている必要がありますが、誰も彼らによって遅くなりたくありません。 Chronicleキューを使用して、監視されたシステムとコンプライアンスシステム間のデータをバッファすることにより、監視されたシステムのコンプライアンス記録の影響について心配する必要はありません。繰り返しになりますが、Chronicleキューは、何年も保持されてきた1秒あたり、1秒あたりの数百万のイベントをサポートできます。
Chronicleキューは、同じマシン上のJVM間の低レイテンシIPC(インタープロセス通信)を1マイクロ秒の大きさでサポートしています。数十万人の控えめなスループットのために、典型的なレイテンシが10マイクロ秒のマシン間と同様に。 Chronicleキューは、安定したマイクロ秒レイテンシを使用して、1秒あたり数百万のイベントのスループットをサポートしています。
マイクロサービスでクロニクルキューの使用に関する記事を参照してください
クロニクルキューを使用して、状態マシンを構築できます。これらのコンポーネントの状態に関するすべての情報は、コンポーネントまたはその状態に直接アクセスすることなく、外部から再現できます。これにより、追加のロギングの必要性が大幅に減少します。ただし、必要なログはすべて詳細に記録できます。これにより、生産のDEBUG
ロギングが実用的になります。これは、伐採のコストが非常に低いためです。 10マイクロ秒未満。ログの統合のためにログを中央に複製できます。クロニクルキューは、100個以上のデータを保存するために使用されています。これは、任意の時点から再生できます。
非バッチストリーミングコンポーネントは、非常にパフォーマンスが高く、決定論的で、再現性があります。特定の順序でプレイされた100万のイベントの後にのみ表示されるバグを再現できます。これにより、高品質の結果が必要なシステムにとって、ストリーム処理を魅力的にすることができます。
リリースはMaven Centralで利用できます。
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
クロニクルキューリリースノートを参照して、最新バージョン番号を取得します。スナップショットはhttps://oss.sonatype.orgで入手できます
注記 | パッケージ「内部」、「Impl」、および「メイン」(後者はさまざまな実行可能なメインメソッドを含む)のいずれかに存在するクラスとサブパッケージは、パブリックAPIの一部ではなく、任意の場合に変更される可能性があります。何らかの理由で時間。詳細については、それぞれのpackage-info.java ファイルを参照してください。 |
Chronicle Queue V5では、Chronicle Queue V4では、怠dazなインデックスの概念がありました。この場合、付録はインデックスを記述せず、代わりにテーラーがインデックスを作成することができます。 v5で怠zyなインデックスを削除することにしました。テーラーを読み取り専用にすると、クロニクルキューが簡素化されるだけでなく、コードの他の場所に最適化を追加することもできます。
クロニクルキューのロックモデルは、v5で変更されました。Chronicleキューv4では、.cq4ファイルには、並行の書き込みを防ぐために)書き込みロックが存在します。 V5では、これはテーブルストア(Metadata.cq4t)と呼ばれる単一のファイルに移動しました。これにより、テーブルストアファイルのみを検査する必要があるため、ロックコードが内部的に簡素化されます。
Chronicle Queue V5を使用して、Chronicle Queue V4で書かれたメッセージを読み取ることができますが、これは常に動作することを保証しません - たとえば、 wireType(WireType.FIELDLESS_BINARY)
でV4キューを作成した場合、クロニクルキューV5はできませんキューのヘッダーを読んでください。 V5の読み取りV4キューのテストがいくつかありますが、これらは限られており、すべてのシナリオがサポートされない場合があります。
Chronicle Queue V5を使用して、クロニクルキューV4キューに書き込むことはできません。
Chronicle Queue V4は、V3に存在する次の問題を解決するクロニクルキューの完全な書き直しです。
自己記述メッセージがなければ、ユーザーはメッセージをダンプするための独自の機能を作成し、データの長期的なストレージを作成する必要がありました。 V4を使用すると、これを行う必要はありませんが、必要に応じてできます。
バニラクロニクルキューは、スレッドごとにファイルを作成します。スレッドの数が制御されている場合、これは問題ありませんが、多くのアプリケーションは使用されているスレッドの数をほとんどまたはまったく制御できず、これがユーザビリティの問題を引き起こしました。
インデックス付きおよびバニラクロニクルの構成は完全にコードであったため、読者はライターと同じ構成を持たなければならなかったので、それが何であるかは常に明確ではありませんでした。
プロデューサーがA 2番目のマシンに複製されたデータの量を知る方法はありませんでした。唯一の回避策は、データをプロデューサーに再現することでした。
メッセージの書き込みを開始する前に、予約するデータのサイズを指定する必要がありました。
インデックス付きクロニクルを使用する際に、Appenderのために独自のロックを行う必要がありました。
Chronicle Queue V3では、すべてがワイヤーではなくバイトの観点からでした。 Chronicle Queue V4でBYTEを使用するには、2つの方法があります。 writeBytes
とreadBytes
メソッドを使用するか、ワイヤーからbytes()
を取得できます。例えば:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle Queue Enterprise Editionは、成功したオープンソースクロニクルキューの商業的にサポートされているバージョンです。オープンソースのドキュメントは、エンタープライズエディションのライセンスを取得したときに利用できる追加機能を説明するために、以下のドキュメントによって拡張されています。これらは:
メッセージキューとメッセージの暗号化。詳細については、暗号化のドキュメントを参照してください。
すべてのキューデータのリアルタイムバックアップを確保するために、ホスト間のTCP/IP(およびオプションでUDP)レプリケーション。詳細については、レプリケーションドキュメントを参照してください。キューレプリケーションプロトコルは複製プロトコルで説明されています。
毎日のキューロールオーバースケジューリングのタイムゾーンサポート。詳細については、TimeZoneサポートを参照してください。
Asyncモードは、遅いファイルシステムで高スループットでパフォーマンスを向上させるためのサポートをサポートします。詳細については、Asyncモードとパフォーマンスも参照してください。
改善された外れ値のためのプリタッチャーは、プリタッチャーとその構成を参照してください
さらに、あなたは当社の技術専門家によって完全にサポートされます。
Chronicle Queue Enterprise Editionの詳細については、[email protected]にお問い合わせください。
クロニクルキューは、 SingleChronicleQueue.class
によって定義されます。
毎日、毎週、または1時間ごとにファイルをローリングします。
同じマシンの同時作家、
TCPレプリケーションを介して、同じマシンまたは複数のマシンでのコンカレントリーダー(クロニクルキューエンタープライズを使用)、
Dockerまたは他のコンテナ化されたワークロードの間の同時読者と作家
ゼロコピーシリアル化と脱派化、
コモディティハードウェアに関する数百万の書き込み/読み取り。
I7-4790プロセッサの96バイトメッセージの約500万メッセージ/秒。キューディレクトリ構造は次のとおりです。
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
この形式は、 BinaryWire
またはTextWire
を使用してフォーマットされたサイズが固定されたバイトで構成されています。 Chronicleキューは、コードから駆動されるように設計されています。ニーズに合ったインターフェイスを簡単に追加できます。
注記 | かなり低レベルの操作により、Chronicleキューの読み取り/書き込み操作は、未チェックの例外を投げることができます。スレッドの死を防ぐためには、 RuntimeExceptions キャッチし、必要に応じてそれらをログ/分析することが実用的かもしれません。 |
注記 | クロニクルキューの使用方法については、クロニクルキューのデモを参照してください。 |
次のセクションでは、まず、いくつかの用語とクロニクルキューを使用するための簡単な参照を紹介します。次に、より詳細なガイドを提供します。
Chronicle Queueは、同じマシン上の複数のJVMにまたがる同時の作家と読者をサポートするメッセージの永続的なジャーナルです。すべての読者はすべてのメッセージを見ており、読者はいつでも参加でき、すべてのメッセージを見ることができます。
注記 | 消費者という用語を故意に回避し、代わりに読み取りによってメッセージが消費/破壊されないため、読者を使用します。 |
クロニクルキューには、次の主な概念があります。
抜粋
抜粋は、クロニクルキューのメインデータコンテナです。言い換えれば、各クロニクルキューは抜粋で構成されています。クロニクルキューにメッセージを書くということは、新しい抜粋を開始し、メッセージを書き込み、最後に抜粋を終了することを意味します。
appender
Appenderはメッセージのソースです。クロニクル環境のイテレーターのようなもの。現在のクロニクルキューを追加するデータを追加します。キューの終わりのみにAppendingのみで順次書き込みを実行できます。抜粋を挿入または削除する方法はありません。
テーラー
Tailerは、シーケンシャルリード用に最適化された抜粋リーダーです。前方と後方の両方で、シーケンシャルとランダムの読み取りを実行できます。テラーは、呼び出されるたびに次の利用可能なメッセージを読みます。以下は、クロニクルキューで保証されています。
各Appenderについて、メッセージはAppenderがそれらを書いた順序で書かれています。異なる付録によるメッセージはインターリーブされます、
各テーラーについて、他のすべてのテーラーと同じ順序でトピックのすべてのメッセージが表示されます。
複製すると、すべてのレプリカにはすべてのメッセージのコピーがあります。
Chronicleキューはブローカーレスです。ブローカーとアーキテクチャが必要な場合は、[email protected]にお問い合わせください。
ファイルローリングおよびキューファイル
Chronicleキューは、キューが作成されたときに選択されたロールサイクルに応じてファイルをロールするように設計されています(ロールサイクルを参照)。言い換えれば、拡張cq4
を持つロールサイクルごとにキューファイルが作成されます。ロールサイクルがロールするポイントに達すると、Appenderは現在のファイルの最後にEOF
マークを原子的に書き込み、他のAppenderがこのファイルに書き込み、テーラーがさらに読み取る必要がないことを示し、代わりに全員が新しいファイルを使用する必要があります。
プロセスがシャットダウンされ、ロールサイクルが新しいファイルを使用する必要があるときに後で再起動した場合、Appenderは古いファイルを見つけて、それらにEOF
マークを書き、テーラーがそれらを読むのを助けることを試みます。
トピック
各トピックは、キューファイルのディレクトリです。 mytopic
というトピックがある場合、レイアウトは次のようになります。
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
すべてのデータを1日(またはサイクル)にコピーするには、その日のファイルを開発マシンにコピーしてリプレイテストすることができます。
トピックとメッセージの制限
トピックは、ディレクトリ名として使用できる文字列に限定されています。トピック内では、シリアル化できるデータ型になる可能性のあるサブトピックを使用できます。メッセージは、任意のシリアル化可能なデータにすることができます。
クロニクルキューはサポートしています:
Serializable
オブジェクトですが、これは効率的ではないため避けるべきです
標準のJava APIを使用する場合は、 Externalizable
オブジェクトが推奨されます。
byte[]
とString
Marshallable
; YAML、バイナリYAML、またはJSONとして書くことができる自己説明メッセージ。
低レベルのバイナリまたはテキストエンコードであるBytesMarshallable
。
このセクションでは、Chronicleキューを使用して、キューの作成、書き込み/読み取り方法を簡単に示すためのクイックリファレンスを提供します。
クロニクルキュー構造
クロニクルキューのインスタンスを作成することは、コンストラクターを呼び出すだけとは異なります。インスタンスを作成するには、 ChronicleQueueBuilder
使用する必要があります。
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
この例では、2つのRandomAccessFiles
を作成するIndexedChronicle
を作成しました。 1つはインデックス用、もう1つは比較的名前を持つデータ用です。
${java.io.tmpdir}/getting-started/{today}.cq4
キューに書き込みます
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
キューから読む
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
また、 ChronicleQueue.dump()
メソッドを使用して、生の内容を文字列として捨てることができます。
queue . dump ();
掃除
Chronicle QueueはデータをHEAPから保存します。クロニクルキューの作業が終了したら、 close()
無料でリソースに呼び出すことをお勧めします。
注記 | これを行うと、データは失われません。これは、使用されたリソースをクリーンアップするためだけです。 |
queue . close ();
それをすべてまとめる
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
構成パラメーターまたはシステムプロパティを使用して、Chronicleキューを構成できます。さらに、プロキシの使用やMethodReader
とMethodWriter
使用など、キューに執筆/読み取り/読み取りする方法はさまざまです。
Chronicleキュー(CQ)は、 SingleChronicleQueueBuilder
クラスの多くの方法で構成できます。お客様が最も質問したパラメーターのいくつかを以下で説明します。
ロールサイクル
RollCycle
パラメーターは、CQが基礎となるキューファイルを転がすレートを構成します。たとえば、次のコードスニペットを使用すると、1時間ごとにキューファイルが展開されます(つまり、作成された新しいファイルが作成されます)。
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
キューのロールサイクルが設定されると、後日変更することはできません。同じパスを使用するように構成されたSingleChronicleQueue
のさらなるインスタンスは、同じロールサイクルを使用するように構成する必要があります。この場合、ライブラリユーザーに状況を通知するために、警告ログメッセージが印刷されます。
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
コンソール出力:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
キューファイルに保存できるメッセージの最大数は、ロールサイクルによって異なります。これの詳細については、FAQを参照してください。
クロニクルキューでは、ロールオーバー時間はUTCに基づいています。 TimeZone Rollover Enterprise機能は、UTCではなくキューロールオーバーの時間と周期性を指定するクロニクルキューの能力を拡張します。詳細については、TimeZoneキューロールオーバーを参照してください。
ChronicleキューFileUtil
クラスは、キューファイルを管理するための便利な方法を提供します。ロールファイルの管理を直接参照してください。
盗聴
クロニクルキューがWireType
明示的に設定してデータを保存する方法を構成することができます。
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
例えば:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
ビルダーの作成時に盗聴を明示的に提供することは可能ですが、すべてのワイヤータイプがクロニクルキューによってまだサポートされているわけではないため、落胆しています。特に、次のワイヤータイプはサポートされていません。
テキスト(および基本的にはすべてJSONやCSVを含むテキストに基づいています)
生
read_any
ブロックサイズ
キューが読み取り/書き込まれると、現在読み取られているファイルの一部がメモリセグメントにマッピングされます。このパラメーターは、メモリマッピングブロックのサイズを制御します。このパラメーターは、必要がある場合はSingleChronicleQueueBuilder.blockSize(long blockSize)
を使用して変更できます。
注記 | blockSize 変更を不必要に避けないでください。 |
大きなメッセージを送信している場合は、大きなblockSize
を設定する必要があります。つまり、 blockSize
メッセージサイズの少なくとも4倍にする必要があります。
警告 | 大きなメッセージに小さなblockSize 使用すると、 IllegalStateException 受け取ると、書き込みが中止されます。 |
キューを複製するときにキューインスタンスごとに同じblockSize
を使用することをお勧めします。 blockSize
キューのメタデータに書き込まれていないため、理想的にはクロニクルキューのインスタンスを作成するときに同じ値に設定する必要があります(これはお勧めですが、希望する場合はお勧めしますが別のblocksize
で実行するには、できます)。
ヒント | 複製されたキューのインスタンスごとに同じblockSize を使用します。 |
インデックススペース
このパラメーターは、明示的にインデックス化された抜粋間の空間を示しています。数が多いと、シーケンシャルの書き込みパフォーマンスが高くなりますが、ランダムアクセスが遅いことを意味します。連続した読み取りパフォーマンスは、このプロパティの影響を受けません。たとえば、次のデフォルトのインデックス間隔を返すことができます。
16(細かく)
64(毎日)
このパラメーターは、メソッドSingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
を使用して変更できます。
indexCount
各インデックス配列のサイズ、およびキューファイルごとのインデックス配列の総数。
注記 | IndexCount 2は、インデックス付きキューエントリの最大数です。 |
注記 | インデックスの使用の詳細と例については、このユーザーガイドのChronicleキューのセクション抜粋インデックスを参照してください。 |
readbuffermode、writebuffermode
これらのパラメーターは、次のオプションがある読み取りまたは書き込みの緩衝液を定義します。
None
- デフォルト(およびオープンソースユーザーが利用できる唯一のもの)、バッファリングなし。
Copy
- 暗号化と組み合わせて使用。
Asynchronous
- Chronicle Asyncモードによって提供される読み取りおよび/または執筆時に、非同期バッファーを使用します。
bublecercapacity
bufferMode: Asynchronous
使用する場合のバイト単位のリングバッファ容量
クロニクルキューでは、抜粋を保存するものとして、データをクロニクルキューに書き込む行為を参照します。このデータは、テキスト、番号、シリアル化されたブロブなど、任意のデータタイプから構成できます。最終的に、すべてのデータは、それが何であるかに関係なく、一連のバイトとして保存されます。
抜粋を保存する直前に、Chronicle Queueは4バイトのヘッダーを留保します。 Chronicleキューは、データの長さをこのヘッダーに書き込みます。このようにして、クロニクルキューが抜粋を読み取るようになると、データの各塊がどれくらいの期間かを知っています。この4バイトヘッダーと、抜粋とともに、ドキュメントと呼びます。厳密に言えば、クロニクルキューを使用してドキュメントの読み書きに使用できます。
注記 | この4バイトのヘッダー内では、ロックなどの多くの内部操作のためにいくつかのビットを予約して、プロセッサとスレッドの両方にクロニクルキュースレッドセーフを作成します。注意すべき重要なことは、このため、4バイトを整数に厳密に変換して、データブロブの長さを見つけることができないということです。 |
前に述べたように、Chronicle QueueはAppenderを使用してキューに書き込み、キューから読み取ります。他のJavaキューイングソリューションとは異なり、メッセージはテーラーで読まれたときにメッセージが失われません。これについては、以下のセクション「テーラーを使用したキューからの読み取り」について詳しく説明しています。クロニクルキューにデータを書き込むには、最初にAppenderを作成する必要があります。
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
クロニクルキューは、次の低レベルインターフェイスを使用してデータを書き込みます。
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
リソースの試行の終わりは、データの長さがヘッダーに書き込まれるポイントです。 DocumentContext
使用して、データが割り当てられたばかりのインデックスを見つけることもできます(以下を参照)。後でこのインデックスを使用して、この抜粋を移動/検索できます。各クロニクルキューの抜粋には一意のインデックスがあります。
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
writeText()
などの以下の高レベルのメソッドは、 appender.writingDocument()
を呼び出すための利便性方法ですが、どちらも基本的に同じことを行います。実際のwriteText(CharSequence text)
のコードは次のようになります。
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
そのため、低レベルのAPIまで、RAWメモリまで、多くの高レベルのインターフェイスを選択できます。
これは、あなたが書いているという事実をメッセージングにまったく隠している最高レベルのAPIです。利点は、実際のコンポーネント、または別のプロトコルへのインターフェイスでインターフェイスにコールを交換できることです。
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
「自己説明メッセージ」を書くことができます。このようなメッセージは、スキーマの変更をサポートできます。また、問題をデバッグしたり診断したりするときは、理解しやすいです。
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
自己説明の「生データ」を書くことができます。タイプは常に正しいです。位置は、これらの値の意味に関する唯一の兆候です。
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
自己説明ではない「生データ」を書くことができます。読者は、このデータの意味、および使用されたタイプを知っている必要があります。
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
以下では、データを書き込む最低レベルの方法を示します。あなたは生のメモリに住所を取得し、あなたが望むものを何でも書くことができます。
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
キューの内容を印刷できます。最初の2つを見ることができ、最後の2つのメッセージは同じデータを保存します。
// dump the content of the queue
System . out . println ( queue . dump ());
印刷:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
キューを読むことは、執筆と同じパターンに従いますが、読み込もうとするときにメッセージがない可能性があります。
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
各メッセージをメッセージのコンテンツに基づいてメソッドコールに変換し、クロニクルキューにメソッド引数を自動的にゆったりとさせることができます。 reader.readOne()
を呼び出すと、メソッドリーダーと一致しないメッセージを自動的にスキップ(フィルターアウト)します。
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
自分でメッセージをデコードできます。
注記 | フィールドの名前、タイプ、順序は一致する必要はありません。 |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
自己記述的なデータ値を読むことができます。これにより、タイプが正しいことを確認し、必要に応じて変換します。
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
生データをプリミティブと文字列として読むことができます。
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
または、基礎となるメモリアドレスを取得して、ネイティブメモリにアクセスできます。
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
注記 | すべてのテーラーはすべてのメッセージを見ます。 |
抽象化を追加して、メッセージをフィルタリングするか、1つのメッセージプロセッサのみにメッセージを割り当てることができます。ただし、一般に、トピック用にメインテーラーが1つだけ必要で、監視などのサポートテーラーがあります。
Chronicle Queueはトピックを分割しないため、そのトピック内のすべてのメッセージの合計注文が得られます。トピック全体で、注文の保証はありません。複数のトピックから消費するシステムから決定的に再生する場合は、そのシステムの出力から再生することをお勧めします。
クロニクルキューテーラーは、ファイルハンドラーを作成し、関連するChronicle Queueのclose()
メソッドが呼び出されるとき、またはJVMがゴミコレクションを実行するたびに、ファイルハンドラーがクリーンアップされます。コードにGCの一時停止がない場合、ファイルハンドラーを明示的にクリーンアップしたい場合は、次のように呼び出すことができます。
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
使用する一部のアプリケーションでは、キューの終わりから読み始める必要がある場合があります(例:再起動シナリオで)。このユースケースでは、 ExcerptTailer
toEnd()
メソッドを提供します。テーラー方向がFORWARD
に(デフォルトで、またはExcerptTailer.direction
メソッドによって設定されている場合)場合、 toEnd()
を呼び出すと、キュー内の最後の既存のレコードの直後にテーラーが配置されます。この場合、Tailerはキューに追加された新しいレコードを読む準備ができました。キューに新しいメッセージが追加されるまで、読み取りできる新しいDocumentContext
ありません。
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
最後からキューを逆方向に読み取る必要がある場合は、テーラーを後方に読むように設定できます。
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
逆方向に読み取ると、 toEnd()
メソッドは、キューの最後のレコードにテーラーを移動します。キューが空でない場合は、読み取りできるDocumentContext
があります。
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
別名「Tailers」という名前。
アプリケーションの再起動時に続く場所から続くテーラーを持つことが有用です。
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "a"最後にメッセージ2を読み取ります
テーラー「A」はメッセージ3を読み取ります
Tailer "B"は最後にメッセージ0を読み取ります
Tailer "b" next reads message 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
注記 | The direction() is not preserved across restarts, only the next index to be read. |
注記 | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
ヒント | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
注記 | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age;プライベート文字列名; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
または
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
注記 | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
警告 | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
注記 | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
注記 | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
注記 | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
結果:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 -------------------------------------------------------------- -------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -------------------------------------------------------------- -------------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 -------------------------------------------------------------- -------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -------------------------------------------------------------- -------------------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 -------------------------------------------------------------- -------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -------------------------------------------------------------- -------------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 -------------------------------------------------------------- -------------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -------------------------------------------------------------- -------------------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
結果:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | スループット |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.