https://player.vimeo.com/video/201989439
編年史隊列是用於高性能應用程序的持續存在的低延遲消息框架。
該項目涵蓋了編年史的Java版本。該項目的C ++版本也可用,並支持Java/C ++互操作性以及其他語言綁定,例如Python。如果您有興趣評估C ++版本,請聯繫[email protected]。
乍一看,紀事隊的隊列可以被視為另一個隊列實現。但是,它具有應強調的主要設計選擇。使用非主存儲,Chronicle隊列提供了一個環境,該環境不受垃圾收集(GC)的困擾。在Java中實施高性能和內存密集型應用程序(您聽到了花哨的術語“ BigData”?)時,最大的問題之一就是垃圾收集。
編年史隊列允許將消息添加到隊列的末尾(“附加”),從隊列中讀取(“尾巴”),還支持隨機訪問搜索。
您可以將編年史隊列類似於低潛伏期經紀人耐用/持續存在的主題,該主題可以包含不同類型和尺寸的消息。編年史隊列是一個分佈式無界的持續隊列,該隊列:
支持異步RMI並發布/訂閱微秒潛伏期的接口。
在微秒內傳遞JVM之間的消息
通過10微秒以下的複制(企業功能)在不同機器上的JVM之間傳遞消息
為一個隊列提供一個穩定的,柔軟的實時延遲到每秒數百萬個消息;每個事件的總訂購。
在發布40個字節消息時,我們在1微秒以下達到潛伏期的時間很高。第99個百分位延遲是100分之一的最差1,第99.9個百分位數是1000個延遲中最差的1個。
批量大小 | 每分鐘1000萬事件 | 每分鐘6000萬事件 | 每分鐘1億個活動 |
---|---|---|---|
99%的ILE | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%的ILE | 1.2 µs | 1.3 µs | 1.5 µs |
批量大小 | 每分鐘1000萬事件 | 每分鐘6000萬事件 | 每分鐘1億個活動 |
---|---|---|---|
99%的ILE | 20 µs | 28 µs | 176 µs |
99.9%的ILE | 901 µs | 705 µs | 5,370 µs |
筆記 | 每分鐘1億個活動每660納秒發出活動;複製和持續。 |
重要的 | 使用大型機器無法實現此性能。這是使用一個線程發布的,而一個線程要消耗。 |
編年史隊的設計為:
成為可以使用微秒實時延遲閱讀的“記錄所有商店”。這甚至支持最苛刻的高頻交易系統。但是,它可以在信息記錄是一個問題的任何應用程序中使用。
在成功複製消息時,支持可靠的複制,並通知appender(消息的作者)或裁縫(消息的讀者)。
編年史隊列假設與內存相比,磁盤空間便宜。編年史隊列充分利用您擁有的磁盤空間,因此您不受計算機的主要內存限制。如果您使用旋轉HDD,則可以以幾乎沒有成本存儲許多TBS的磁盤空間。
Chronicle隊列需要運行的唯一額外軟件是操作系統。它沒有經紀人;相反,它使用您的操作系統來完成所有工作。如果您的應用程序死亡,則操作系統的運行持續更長的時間,因此不會丟失數據;即使沒有復制。
由於Chronicle隊列將所有保存的數據存儲在內存映射的文件中,即使您擁有超過100 TB的數據,也有一個微不足道的架子開銷。
編年史為達到非常低的延遲而付出了巨大的努力。在其他關注Web應用程序支持的產品中,少於40毫秒的潛伏期比您看到的要快得多。例如,電影幀速率為24 Hz,或大約40毫秒。
編年史的旨在以99%至99.99%的時間達到40微秒以下的潛伏期。使用編年史隊列而無需複制,我們支持在多個服務的端到端低於40微秒的潛伏期的應用程序。紀事排隊的99%延遲通常完全取決於操作系統和硬盤子系統的選擇。
編年史隊列的複制支持編年史企業。這支持實時壓縮,該壓縮計算單個對象的三角洲。這可以將消息的大小減少10倍或更高,而無需批處理;也就是說,沒有引入大量延遲。
編年史隊列還支持LZW,Snappy和GZIP壓縮。但是,這些格式增加了明顯的延遲。這些僅在您對網絡帶寬有嚴格的限制時才有用。
編年史隊列支持許多語義:
每個消息都在重新啟動時重播。
重新啟動時僅播放新消息。
使用條目的索引從任何已知點重新啟動。
僅重播您錯過的消息。使用MethodReader/MethodWriter構建器直接支持這一點。
在大多數係統系統上, System.nanoTime()
大約是系統上一次重新啟動以來的納秒數(儘管不同的JVM的行為可能有所不同)。在同一台計算機上的JVM之間是相同的,但在機器之間卻大不相同。關於機器的絕對差異是毫無意義的。但是,該信息可用於檢測異常值。您無法確定最好的延遲是什麼,但是您可以確定距離最佳潛伏期的距離。如果您專注於第99%的延遲,這將很有用。我們有一個名為RunningMinimum
的類,可以從不同的機器中獲取時間,同時補償了機器之間的nanoTime
漂移。您進行測量的頻率越多,該運行最小值的準確性就越準確。
編年史隊列通過週期管理存儲。您可以添加一個StoreFileListener
該商店將在添加文件時以及不再保留文件時通知您。您可以一次移動,壓縮或刪除所有消息。注意:不幸的是,在Windows上,如果IO操作被中斷,則可以關閉基礎Filechannel。
由於表現原因,我們已刪除了編年史列代碼中的中斷檢查。因此,我們建議您避免將編年史隊列與生成中斷的代碼使用。如果您無法避免生成中斷,那麼我們建議您創建一個單獨的每個線程編年史的實例。
Chronicle隊列通常用於以生產者為中心的系統,您需要在幾天或幾年內保留大量數據。有關統計信息,請參見紀事的使用情況
重要的 | Chronicle隊列不支持在任何網絡文件系統中運行,無論是NFS,AFS,基於SAN的存儲還是其他任何東西。這樣做的原因是這些文件系統沒有為記憶映射文件的所有必需的原始紀錄提供編年史的使用。如果需要任何網絡(例如,使數據可訪問多個主機),則唯一支持的方法是編年史隊列複製(企業功能)。 |
大多數消息傳遞系統以消費者為中心。實施流量控制是為了避免消費者曾經超負荷;甚至暫時。一個常見的示例是支持多個GUI用戶的服務器。這些用戶可能會在不同的機器(OS和硬件),網絡的不同質量(延遲和帶寬)上,在不同的時間做其他事情。因此,客戶消費者告訴生產者何時退縮是有意義的,延遲任何數據,直到消費者準備獲取更多數據。
Chronicle隊列是以生產者為中心的解決方案,可以盡一切可能永遠不會推遲生產者,或者告訴它放慢速度。這使其成為一個強大的工具,可在系統之間提供一個很大的緩衝區,以及您幾乎無法控制的上游生產商。
市場數據出版商不會讓您選擇長期推薦生產商。如果有的話。我們的一些用戶從CME OPRA消耗數據。這會產生每分鐘1000萬次活動的峰值,以UDP數據包發送而無需重試。如果您錯過或放下數據包,那麼它會丟失。您必須盡快消耗並記錄這些數據包,而網絡適配器中的緩衝很少。特別是對於市場數據,實時意味著在幾微秒內;這並不意味著日內(白天)。
編年史隊列效率快,有效,已用於提高線程之間數據傳遞的速度。此外,它還保留了每條傳遞的每條消息的記錄,從而使您可以大大減少需要執行的記錄量。
如今,越來越多的系統需要合規系統。每個人都必須擁有它們,但是沒有人願意被他們放慢腳步。通過使用編年史隊列在監視系統和合規系統之間進行緩衝數據,您無需擔心合規性記錄對監視系統的影響。同樣,編年史隊列可以支持數年保留多年的每秒,每服務器和訪問數據的數百萬事件。
編年史隊列以1微秒為單位的數量級,支持同一機器上JVM之間的低延遲IPC(相互處理交流);以及在典型的延遲時間為10微秒的機器之間,適度的吞吐量為數十萬。編年史隊列支持每秒數百萬事件的吞吐量,並具有穩定的微秒延遲。
請參閱有關在微服務中使用紀事排隊的文章
編年史可以用來建造狀態機。有關這些組件狀態的所有信息都可以在外部複製,而無需直接訪問組件或其狀態。這大大減少了對額外記錄的需求。但是,您確實需要的任何記錄都可以詳細記錄。這使得啟用生產實用的DEBUG
記錄。這是因為記錄的成本非常低。小於10微秒。可以將日誌集中復制以進行日誌整合。編年史隊列用於存儲100多個TB數據,可以從任何時間點重播。
非批次流組件具有高度性能,確定性和可再現的。您可以復制僅在以特定順序進行的一百萬個事件出現並加速逼真的時間之後出現的錯誤。這使得使用流處理對需要高質量質量結果的系統有吸引力。
在Maven Central上可以使用:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
請參閱編年史隊列發行說明並獲取最新版本號。快照可在https://oss.sonatype.org上找到
筆記 | 駐留在“內部”包中的任何一個,“ impl”和“ main”(後者包含各種可運行的主要方法)和任何子包裝的類中都不是公共API的一部分,並且可能會在任何情況下更改出於任何原因的時間。有關詳細信息,請參見相應的package-info.java 文件。 |
現在,在編年史隊的V5裁縫中,只讀的是,在編年史隊列V4中,我們具有懶惰索引的概念,在這裡,附錄不會編寫索引,而是可以由尾聲完成索引。我們決定在V5中放下懶惰的索引;使裁縫只讀不僅簡化了編年史隊列,而且還允許我們在代碼其他地方添加優化。
v5中更改了編年史隊列的鎖定模型,在.cq4文件中存在寫入鎖(以防止並發寫入隊列)。在V5中,這移至一個名為Table Store(Metadata.cq4t)的單個文件。這簡化了內部鎖定代碼,因為只能檢查表存儲文件。
您可以使用Chronicle隊列V5讀取使用Chronicle隊列V4編寫的消息,但這不能始終可以始終工作 - 例如,如果您使用wireType(WireType.FIELDLESS_BINARY)
創建了V4隊列,那麼Chronicle decorele decornle dechornicle decor v5將無法閱讀隊列的標題。我們對V5閱讀V4隊列的測試進行了一些測試,但這些測試是有限的,所有場景可能不支持。
您不能使用編年史隊列V5寫信給編年史隊列V4隊列。
紀事排隊V4是編年史隊列的完整重寫,該編織隊列解決了V3中存在的以下問題。
如果沒有自描述消息,用戶就必須創建自己的功能來傾倒消息和長期存儲數據。使用V4,您不必這樣做,但是如果您願意的話,您可以。
Vanilla Chronicle隊列將每個線程創建一個文件。如果控制螺紋的數量,這很好,但是,許多應用程序對使用多少線程幾乎沒有控制,這會導致可用性問題。
索引和香草紀事的配置完全是代碼中的,因此讀者必須具有與作家相同的配置,並且並不總是清楚那是什麼。
生產商無法知道第二台機器已將多少數據複製到了。唯一的解決方法是將數據複製回生產商。
您需要在開始寫消息之前指定要保留的數據大小。
使用索引編年史時,您需要自己鎖定自己的鎖定。
在編年史隊列V3中,一切都是按字節而不是電線表示的。在編年史隊列V4中使用字節有兩種方法。您可以使用writeBytes
和readBytes
方法,也可以從電線中獲取bytes()
。例如:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle隊列企業版是我們成功的開源編年史隊列的商業支持版本。以下文檔擴展了開源文檔,以描述您獲得企業版許可時可用的其他功能。這些都是:
消息隊列和消息的加密。有關更多信息,請參見加密文檔。
主機之間的TCP/IP(以及可選的UDP)複製,以確保所有隊列數據的實時備份。有關更多信息,請參見複製文檔,以復制協議介紹了隊列複製協議。
時區支持每日隊列翻車時間表。有關更多信息,請參見時區支持。
異步模式支持以在較慢的文件系統上提供高吞吐量的改進性能。有關更多信息,請參見異步模式以及性能。
改進異常值的預觸摸器,請參閱預打擊器及其配置
此外,我們的技術專家將為您提供充分的支持。
有關Chronicle隊列企業版的更多信息,請聯繫[email protected]。
編年史隊列由旨在支持的SingleChronicleQueue.class
典型定義:
每天,每週或每小時滾動文件,
同一台機器上的並發作家,
通過TCP複製(使用Chronicle Queue Enterprise),同一台機器上或跨多個計算機上的同時讀取器,
Docker或其他容器的工作負載之間的同時讀者和作家
零複製序列化和避免化,
數以百萬計的商品硬件每秒寫入/讀取。
i7-4790處理器上的96個字節消息約為500萬條消息/秒。隊列目錄結構如下:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
該格式由使用BinaryWire
或TextWire
格式的大小預定字節組成。編年史隊的設計旨在從代碼中驅動。您可以輕鬆添加適合您需求的接口。
筆記 | 由於相當低的操作,編年史隊列讀/寫操作可能會引發不受限制的例外。為了防止線程死亡,可以捕獲RuntimeExceptions 並將其記錄/分析它們可能是實際的。 |
筆記 | 有關如何使用編年史隊列的演示 |
在以下各節中,首先,我們介紹了一些術語和快速參考,以使用編年史隊列。然後,我們提供了更詳細的指南。
Chronicle隊列是一本持續存在的消息雜誌,即使在同一台計算機上的多個JVM中,也支持並發作家和讀者。每個讀者都會看到每一個消息,讀者可以隨時加入,仍然看到每個消息。
筆記 | 我們故意避免使用消費者一詞,而是使用閱讀器,因為閱讀不會消耗/破壞消息。 |
編年史隊列具有以下主要概念:
摘抄
摘錄是編年史隊列中的主要數據容器。換句話說,每個編年史隊列都由摘錄組成。將消息寫給編年史隊列意味著開始新的摘錄,將消息寫入其中以及在最後完成摘錄。
appender
附錄是消息的來源;像編年史環境中的迭代器一樣。您添加附加當前編年史隊列的數據。它可以通過附加到隊列結束來執行順序寫入。沒有辦法插入或刪除摘錄。
裁縫
裁縫是一位優化用於順序讀取的讀者。它可以執行前進和向後進行順序和隨機讀取。每次被調用時,定訂者都會閱讀下一條可用的消息。在編年史中保證以下內容:
對於每個附錄,郵件是按照附錄編寫的順序編寫的。不同附錄的消息交錯,
對於每個裁縫,它將看到一個主題的每條消息,以與其他所有裁縫相同的順序,
複製後,每個複製品都有每個消息的副本。
編年史隊列不再是經紀人。如果您需要經紀人的架構,請聯繫[email protected]。
文件滾動和隊列文件
Chronicle隊列旨在根據創建隊列時選擇的滾動週期滾動文件(請參閱Rollcycles)。換句話說,為具有擴展名cq4
的每個滾動週期創建一個隊列文件。當滾動週期達到應該滾動的點時,Appender將在當前文件的末尾原子寫EOF
標記,以表明沒有其他Appender不得寫入此文件,並且不得進一步讀取器,而每個人都應該使用新文件。
如果該過程被關閉,並在滾動週期應使用新文件時稍後重新啟動,則Appender會嘗試找到舊文件並在其中編寫EOF
標記,以幫助裁縫讀取它們。
主題
每個主題都是隊列文件的目錄。如果您有一個稱為mytopic
的主題,那麼佈局可能會如下:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
要復制一天(或循環)的所有數據,您可以將當天的文件複製到開發計算機進行重播測試。
對主題和消息的限制
主題僅限於可以用作目錄名稱的字符串。在一個主題中,您可以擁有可以序列化的任何數據類型的子主題。消息可以是任何可序列化數據。
編年史隊列支持:
Serializable
對象,儘管這是不高效的,但要避免
如果您希望使用標準Java API,則首選Externalizable
對象。
byte[]
和String
Marshallable
;一個可以寫為yaml,二進制YAML或JSON的自我描述的信息。
BytesMarshallable
,是低級二進製或文本編碼的。
本節提供了快速參考,用於使用編年史隊列簡要顯示如何創建,從隊列中進行/讀取/閱讀。
編年史隊列結構
創建編年史隊列的實例與僅調用構造函數不同。要創建一個實例,您必須使用ChronicleQueueBuilder
。
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
在此示例中,我們創建了一個IndexedChronicle
,該索引創建了兩個RandomAccessFiles
。一個用於索引,一個用於具有相對名稱的數據:
${java.io.tmpdir}/getting-started/{today}.cq4
寫入隊列
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
從隊列閱讀
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
另外, ChronicleQueue.dump()
方法可用於將原始內容倒入字符串。
queue . dump ();
清理
Chronicle隊列將其數據放在外圍,建議您在使用Chronicle隊列工作後調用close()
以免費資源。
筆記 | 如果這樣做,不會丟失數據。這只是清理所使用的資源。 |
queue . close ();
將所有這些放在一起
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
您可以使用其配置參數或系統屬性來配置編年史隊。此外,從諸如代理的使用以及使用MethodReader
和MethodWriter
等隊列中寫入/閱讀的方式有不同的方式。
編年史隊列(CQ)可以通過SingleChronicleQueueBuilder
類上的多種方法進行配置。下面解釋了我們客戶最查詢的一些參數。
滾動
RollCycle
參數配置CQ將滾動基礎隊列文件的速率。例如,使用以下代碼段將導致每小時滾動的隊列文件(即創建的新文件):
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
一旦設置了隊列的滾動循環,就無法在以後更改。配置為使用相同路徑的SingleChronicleQueue
的任何其他實例都應配置為使用相同的滾動循環,如果不是,則將更新滾動循環以匹配持久的滾動週期。在這種情況下,將打印一條警告日誌消息,以將情況通知庫用戶:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
控制台輸出:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
可以在隊列文件中存儲的最大消息數取決於滾動週期。有關此信息的更多信息,請參見常見問題解答。
在編年史隊列中,翻轉時間基於UTC。時區翻轉企業功能擴展了編年史隊列的能力,可以指定隊列滾動的時間和周期性,而不是UTC。有關更多信息,請參見時區隊列翻車。
編年史的FileUtil
類提供了管理隊列文件的有用方法。請參閱直接管理滾動文件。
竊聽
可以通過明確設置WireType
來配置編年史隊列如何存儲數據:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
例如:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
儘管在創建構建器時可以顯式提供竊聽,但由於並非所有電線類型都由編年史隊支持。特別是,不支持以下電線類型:
文本(本質上都是基於文本,包括JSON和CSV)
生的
read_any
阻止
當讀取/書寫隊列時,當前正在讀取/書面的文件的一部分將映射到內存段。此參數控制內存映射塊的大小。您可以使用方法SingleChronicleQueueBuilder.blockSize(long blockSize)
來更改此參數。
筆記 | 您應該避免不必要地更換blockSize 。 |
如果您要發送大消息,則應設置一個blockSize
,即blockSize
至少是消息大小的四倍。
警告 | 如果您使用小型blockSize 用於大郵件,則會收到IllegalStateException 並且寫入中止。 |
我們建議您在復制隊列時對每個隊列實例使用相同的blockSize
,而不會寫入隊列的元blockSize
,因此在創建紀事排隊的實例時,理想情況下應設置為相同的值(如果您願意,但是如果您希望使用要使用不同的blocksize
運行)。
提示 | 對於復制的隊列的每個實例,使用相同的blockSize 。 |
索引空間
此參數顯示了明確索引的摘錄之間的空間。較高的數字意味著更高的順序寫入性能,但隨機訪問讀數較慢。順序讀取性能不受此屬性的影響。例如,可以返回以下默認索引間距:
16(細微)
64(每日)
您可以使用SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
更改此參數。
索引
每個索引數組的大小以及每個隊列文件的索引數組的總數。
筆記 | 索引2是索引隊列條目的最大數量。 |
筆記 | 有關更多信息和使用索引的示例,請參見本用戶指南編年史中的摘要索引。 |
deadbuffermode,writebuffermode
這些參數定義了具有以下選項的讀取或寫入的buffersode:
None
- 默認值(也是唯一可用於開源用戶的一個),沒有緩衝;
Copy
- 與加密結合使用;
Asynchronous
- 在閱讀和/或寫作時使用異步緩衝液,由編年史異步模式提供。
緩衝
使用bufferMode: Asynchronous
在編年史隊列中,我們將您的數據寫入編年史隊列的行為是存儲摘錄的。這些數據可以由任何數據類型組成,包括文本,數字或序列化斑點。最終,您的所有數據,無論它是什麼數據,都被存儲為一系列字節。
在存儲摘錄之前,Chronicle隊列保留了一個4字節的標題。編年史隊列將數據的長度寫入此標題。這樣,當Chronicle隊列閱讀您的摘錄時,它就會知道每個數據斑點有多長時間。我們將此4個字節標頭以及您的摘錄稱為文檔。嚴格地說,紀事的隊列可用於讀寫文檔。
筆記 | 在這個4個字節的標頭中,我們還為許多內部操作(例如鎖定)保留了一些位,以使紀事排隊螺紋在處理器和線程之間進行紀錄隊列線程安全。要注意的重要一點是,因此,您無法嚴格地將4個字節轉換為整數以找到數據斑點的長度。 |
如前所述,Chronicle隊列使用Appender寫信給隊列,並從隊列中讀取尾聲。與其他Java排隊解決方案不同,隨著裁縫的閱讀,消息不會丟失。在下面的“使用尾聲從隊列閱讀”的部分中,這是更詳細的詳細信息。要將數據寫入編年史隊列,您必須首先創建一個appender:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
編年史隊列使用以下低級接口來編寫數據:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
try-with-resources上的近距離是將數據的長度寫入標題的重點。您還可以使用DocumentContext
來找出剛剛分配了數據的索引(請參見下文)。您稍後可以使用此索引移動/查找此摘錄。每個編年史隊列摘錄都有一個唯一的索引。
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
下面的高級方法(例如writeText()
是調用appender.writingDocument()
便利方法,但兩種方法基本上都在做同樣的事情。 writeText(CharSequence text)
的實際代碼如下:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
因此,您可以選擇多個高級接口,直至低水平的API到原始內存。
這是最高級別的API,它隱藏了您所寫的事實。好處是,您可以將調用與真實組件或接口將接口交換為其他協議。
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
您可以寫一個“自我描述消息”。這樣的消息可以支持模式更改。在調試或診斷問題時,它們也更容易理解。
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
您可以編寫自我描述的“原始數據”。類型始終是正確的;位置是這些值的含義的唯一跡象。
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
您可以編寫不是自稱的“原始數據”。您的讀者必須知道此數據的含義以及所使用的類型。
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
下面說明了寫數據的最低級別的方式。您可以獲得原始內存的地址,並且可以編寫任何想要的內容。
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
您可以打印隊列的內容。您可以看到前兩個,最後兩個消息存儲相同的數據。
// dump the content of the queue
System . out . println ( queue . dump ());
印刷:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
閱讀隊列與寫作相同的模式遵循,除了您嘗試閱讀時可能沒有消息。
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
您可以根據消息的內容將每個消息轉換為一個方法調用,並使編年史隊列自動對該方法參數進行挑選。調用reader.readOne()
將自動跳過(過濾掉)任何與方法讀取器不匹配的消息。
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
您可以自己解碼消息。
筆記 | 字段的名稱,類型和順序不必匹配。 |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
您可以讀取自我描述的數據值。這將檢查類型正確,並根據需要進行轉換。
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
您可以將原始數據讀取為原始數據和字符串。
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
或者,您可以獲取基礎內存地址並訪問本機內存。
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
筆記 | 每個裁縫都會看到每個消息。 |
可以將抽象添加到過濾消息中,也可以將消息分配給一個消息處理器。但是,總的來說,您只需要一個主題的主裁縫,可能還有一些支持監視的裁縫。
由於Chronicle隊列沒有劃分其主題,因此您可以在該主題中獲得所有消息的總訂購。在整個主題中,沒有保證訂購;如果您想從多個主題中消耗的系統確定性重播,我們建議從該系統的輸出中重播。
編年史隊列裁縫可能會創建文件處理程序,只要調用相關編年史的close()
方法,或者每當JVM運行垃圾集合時,就會清理文件處理程序。如果您編寫的代碼沒有GC停頓,並且明確想清理文件處理程序,則可以調用以下內容:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
在某些應用中,可能有必要從隊列結束開始閱讀(例如,在重新啟動方案中)。對於此用例, ExcerptTailer
提供了toEnd()
方法。當尾聲方向FORWARD
(默認情況下,或按ExcerptTailer.direction
方法設置)時,呼叫toEnd()
將在隊列中的最後一個現有記錄之後放置尾聲。在這種情況下,Tailer現在準備好閱讀附加到隊列的任何新記錄。在將任何新消息附加到隊列上之前,將沒有可供閱讀的新DocumentContext
:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
如果有必要從末端從隊列中向後讀取,則可以將尾巴固定為向後讀取:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
向後讀取時, toEnd()
方法將將尾聲移至隊列中的最後一個記錄。如果隊列不是空的,則將有一個DocumentContext
可供閱讀:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
又名名為定的人。
擁有一個尾巴,該裁縫可以從該應用程序的重新啟動時繼續進行。
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
尾聲“ a”最後讀消息2
尾聲“ a”下一個讀取消息3
尾聲“ B”最後讀消息0
尾聲“ B”下一步讀取消息1
這是來自RestartableTailerTest
那裡有兩個裁縫,每個裁縫都有一個唯一的名稱。這些裁縫將其索引存儲在隊列本身中,並且隨著Tailer使用toStart()
, toEnd()
, moveToIndex()
或讀取消息時,該索引將被保留。
筆記 | direction() 不保留在重新啟動之間,只有下一個要讀取的索引。 |
筆記 | 只有在調用DocumentContext.close() 時,尾巴的索引才能進行。如果錯誤阻止了這一點,則將在每個重新啟動時讀取相同的消息。 |
編年史隊列以二進制格式存儲其數據,並使用cq4
的文件擴展名:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
這通常很難讀取,因此最好將cq4
文件作為文本傾倒。這也可以幫助您解決生產問題,因為它使您可以了解隊列中存儲的內容以及按什麼順序。
您可以使用net.openhft.chronicle.queue.main.DumpMain
或net.openhft.chronicle.queue.ChronicleReaderMain
將隊列傾倒到終端。 DumpMain
向終端執行一個簡單的垃圾場,而ChronicleReaderMain
處理更複雜的操作,例如尾隨隊列。它們都可以從命令行以下面描述的多種方式運行。
如果您有一個包括編年史偽像的項目POM文件,則可以讀取帶有以下命令的cq4
文件:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
在上面的命令中, myqueue是包含.cq4文件的目錄
您還可以手動設置任何因文件。這需要chronicle-queue.jar
,從任何版本4.5.3或更高版本中,並且類路徑上都存在所有相關文件。依賴的罐子在下面列出:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
提示 | 要找出包含哪種版本的罐子,請參閱《 chronicle-bom 。 |
一旦依賴關係存在於班級路徑上,您就可以運行:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
這將把19700101-02.cq4
文件作為文本輸出,如下所示:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
筆記 | 上面的示例沒有顯示任何用戶數據,因為沒有將用戶數據寫入此示例文件。 |
還有一個名為dump_queue.sh
的腳本位於Chonicle-Queue/bin
-folder中,該腳本將所需的依賴關係聚集在陰影罐中,並用它將其與DumpMain
一起傾倒了隊列。腳本可以從Chronicle-Queue
根文件夾中運行:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
閱讀隊列記錄紀事排隊內容物的第二個工具是紀事中的ChronicleReaderMain
)。如上所述,除了將文件內容打印到控制台外,它還可以執行多個操作。例如,它可用於尾流,以檢測每當添加新消息時(例如$ tail -f)。
以下是用於配置ChronicleReaderMain
命令行接口:
用法:Chroniclonereadermain -a <Binary-arg>參數傳遞給二進制搜索類 -b <Binary-Search>使用此類用作二進制搜索的比較器 -cbl <基於內容的限制器>指定基於內容的限制器 -cblarg <基於內容的限制器 - argument>指定一個參數,用於基於內容的限制器使用 -d <Directory>包含編年史隊列文件的目錄 -e <Exclude-regex>不顯示包含此正則表達式的記錄 -f尾巴行為 - 等待新記錄到達 -g顯示消息歷史記錄(使用方法讀取器時) -H打印此幫助並退出 -i <include-regex>顯示包含此正則表達式的記錄 -K反向閱讀隊列 -l將每個輸出消息擠入一行 -m <ax-History>顯示了數據集末尾的許多記錄 -n <frof-index>從此索引開始閱讀(例如0x123abe) - 名稱<nual>命名塔式ID -r <as-method-reader>在使用MethodWriter生成的隊列中閱讀時使用 -s顯示索引 -w <線型>控制輸出IE JSON -x <max-results>將結果數限制為輸出的數量 -Z使用本地時區打印時間戳
就像與DumpQueue
一樣,您需要在類路徑上顯示的上面示例中的類。這可以通過手動添加然後運行來再次實現:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
另一個選擇是使用Maven Shade插件創建一個Uber Jar。配置如下:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
一旦存在Uber罐,您就可以ChronicleReaderMain
:
java -cp“ $ uber_jar” net.openhft.chronicle.queue.chronicleadermain“ 19700101-02.cq4”
最後,有一個用於運行名為queue_reader.sh
的讀取器的腳本,該腳本再次位於Chonicle-Queue/bin
-folder中。它會自動收集陰影罐中所需的依賴項,並使用它來運行ChronicleReaderMain
。腳本可以從Chronicle-Queue
根文件夾中運行:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
寫入隊列如果使用MethodReader
和MethodWriter
,則可以使用net.openhft.chronicle.queue.ChronicleWriterMain
或shell Script queue_writer.sh
將單題詞方法調用編寫為隊列。
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
如果您想寫入以下“ doit”方法
public interface MyInterface {
void doit ( DTO dto );
}
公共類DTO擴展了selfdeScrivingMarshalable {私人int age;私有字符串名稱; }
然後,您可以使用以下YAML的(或兩個)來調用ChronicleWriterMain -d queue doit x.yaml
:
{
age : 19,
name : Henry
}
或者
!x.y.z.DTO {
age : 42,
name : Percy
}
如果DTO
利用自定義序列化,則應指定與-i
一起寫入的接口
Chronicle v4.4+支持使用代理編寫和閱讀消息的使用。您首先定義一個異步interface
,所有方法都有:
僅是輸入的參數
沒有回報值或預期的例外。
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
要寫入隊列,您可以調用實現此接口的代理。
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
這些呼叫會產生可以傾倒如下的消息。
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
要閱讀消息,您可以為讀者提供與您撥打相同呼叫的呼叫實現的讀者。
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
運行此示例打印:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
有關更多詳細信息,請參閱,使用方法閱讀器/作家和MessageReaderWritertest
編年史隊列在系統上端到端時,納秒分辨率的明確或隱式分辨率正時。我們支持跨機器使用納米時間,而無需專業硬件。要啟用此功能,請設置隊列的sourceId
。
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
每次讀取的時間戳都會從服務到服務。
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
首先寫
先閱讀
寫入讀取的結果。
觸發此事件的原因。
在下一節中,您將找到如何使用摘錄索引。
在紀事隊的末尾找到索引
編年史隊列附錄是線程本地。實際上,當您要求:
final ExcerptAppender appender = queue.acquireAppender();
acquireAppender()
使用螺紋 - 本地池為您提供一個應用程序,該應用程序將被重複使用以減少對象創建。因此,該方法調用:
long index = appender.lastIndexAppended();
只會為您提供此Appender附加的最後一個索引;不是任何附加者附加的最後一個索引。如果您想查找寫給隊列的最後一個記錄的索引,則必須致電:
queue.lastIndex()
它將返回隊列中存在的最後一個摘錄的索引(或空排隊的-1)。請注意,如果將隊列同時寫成,則可能是一個不足的值,因為隨後的條目甚至可能在返回之前寫入。
兩個索引之間的消息數
計算可以使用兩個索引之間的消息數:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
筆記 | 您應該避免在延遲敏感代碼上調用此方法,因為如果索引在不同的周期中,則該方法可能必須從文件系統訪問.cq4文件。 |
有關此的更多信息,請參見:
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
轉到特定消息並閱讀
以下示例顯示瞭如何寫10條消息,然後移動到第五消息以讀取它
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
您可以添加StoreFileListener
以添加文件或不再使用的文件時通知您。這可以在一段時間後刪除文件。但是,默認情況下,文件將永遠保留。我們最大的用戶有超過100 TB的數據存儲在隊列中。
附錄和裁縫很便宜,因為它們甚至不需要TCP連接;它們只是幾個Java對象。每個裁縫保留的唯一一件事是由以下索引組成的索引:
一個週期號。例如,自時代以來的日子和
該週期內的序列號。
在DAILY
週期的情況下,序列編號為32位, index = ((long) cycle << 32) | sequenceNumber
每天提供多達40億個條目。如果預計每天會有更多消息,例如, XLARGE_DAILY
週期每天使用48位序列編號提供4萬億個條目。在我們的圖書館中打印六核索引很常見,以使看到這兩個組件變得更加容易。
我們支持每個服務器,而不是跨服務器劃分隊列文件,而是存儲與磁盤空間一樣多的數據。這比僅限於您擁有的內存空間的數量更可擴展。您可以比6TB的內存要便宜得多,您可以購買一對冗餘的6TB企業磁盤。
Chronicle隊列運行一個背景線程,以觀察低磁盤空間(請參閱net.openhft.chronicle.threads.DiskSpaceMonitor
類),因為如果磁盤空間足夠低,則JVM在分配新的內存映射文件時可能會崩潰。磁盤空間監視器檢查(對於使用編年史隊的每個文件庫):免費的不到200MB。如果是這樣,您將看到:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
否則,它將檢查閾值百分比並記錄此消息:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
閾值百分比由Chronicle.disk.monitor.threshold.percent System屬性控制。默認值為0。
如前所述,Chronicle隊列將其數據放在“ .cq4”文件中。因此,每當您希望將數據附加到此文件或將數據讀取到此文件中時,編年史將創建一個文件句柄。通常,編年史將每天創建一個新的“ .cq4”文件。但是,這可以更改,以便您每小時,每分鐘甚至每秒都可以創建一個新文件。
如果每秒創建一個隊列文件,我們將其稱為第二滾動。當然,每秒創建一個新文件有點極端,但這是說明以下幾點的好方法。使用第二滾動時,如果您寫了10秒的數據,然後希望讀取此數據,則編年史將不得不掃描10個文件。為了減少文件的創建,編年史隊列懶洋洋地兌現了它們,並且在將數據寫入隊列文件中,必須在關閉文件時進行謹慎的考慮,因為在大多數操作系統上,文件都將強制迫使任何已附加到文件上的數據,要沖洗到磁盤,如果我們不小心,這可能會使您的應用程序拖延。
Pretoucher
是一個旨在從長期線程中調用的類。預裝器的目的是加速隊列的寫作。調用execute()
方法後,此對象將在隊列的下面存儲文件中預觸摸頁面,以便它們在附錄被附錄到隊列要求之前居住在頁面cache(即從存儲中加載)。關閉基礎隊列時,該對象持有的資源將發布。另外,可以調用shutdown()
方法關閉所提供的隊列並發布任何其他資源。 shutdown()
被調用後的execute()
方法的調用將導致丟棄IllegalStateException
。
預感的配置參數(通過系統屬性設置)如下:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(默認為false):導致預感器創建下一個週期文件,而隊列仍在寫入當前的文件,以減輕創建新文件時OS中失速的影響。
警告 | 默認情況下, earlyAcquireNextCycle 將會關閉,如果要打開它,則應在打開之前和之後非常仔細的壓力測試。基本上,您所經歷的與系統有關。 |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(默認為2,000毫秒),預感將創建新的周期文件,這是將其寫入的Advancity的時間。有效地將預感的概念移動到了pretoucherPrerollTimeMs
中“當前”的“當前”概念。
SingleChronicleQueueExcerpts.dontWrite
(默認為false):告訴預感器永遠不會創建尚不存在的周期文件。與默認行為相反,如果預裝器在未寫入摘錄的周期內運行,則會創建“當前”週期文件。顯然,使這將阻止earlyAcquireNextCycle
起作用。
預感用法示例
上述預感的配置參數應通過系統屬性設置。例如,在以下摘錄中, earlyAcquireNextCycle
設置為true
和pretoucherPrerollTimeMs
為100ms。
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
設備的構造函數以將該預裝器分配給並創建一個新的預裝器的隊列名稱。然後,通過調用execute()
方法,預裝器啟動。
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
方法close()
,關閉預裝器並發布其資源。
pretouch . close ();
筆記 | 預感是企業功能 |
可以監視編年史隊列以實時獲得延遲,吞吐量和活動指標(即在事件觸發的微秒內)。
以下圖表顯示了需要多長時間的時間:
寫40個字節消息給紀事排隊
通過TCP複製寫作
請第二份副本確認消息
請讀取已確認的消息
測試進行了十分鐘,並繪製了潛伏期的分佈。
筆記 | 每秒約1000萬條消息的潛伏期一步;隨著消息開始批處理,它會跳躍。以低於此的速度,可以單獨發送每條消息。 |
據信,99.99%及以上是延遲信息超過TCP的延遲。需要進一步的研究來證明這一點。這些延遲是相似的,無論吞吐量如何。 99.9%和99.93%的函數是系統在延遲後能夠恢復的速度。吞吐量越高,系統必須從延遲中恢復的淨空就越少。
當禁用雙重屏障時,所有寫入隊列的寫入將根據寫鎖定的獲取來序列化。每次調用ExcerptAppender.writingDocument()
時,Appender都會嘗試在隊列上獲取寫鎖定,如果它不這樣做,則它會阻止其阻止,直到解鎖寫入鎖,然後又鎖定了隊列。
當啟用雙重屏障時,如果Appender發現在呼叫ExcerptAppender.writingDocument()
呼叫時獲取寫鎖定,它將立即返回,並指向輔助緩衝區的上下文,並且基本上是fefers lock occecisition直到context.close()
被調用(通常使用Try-Resources模式,它位於Try Block的末尾),允許用戶繼續編寫數據,然後基本上對序列化數據進行memcpy(從而降低了序列化的成本) 。默認情況下,雙重緩衝被禁用。您可以通過打電話啟用雙重屏蔽
SingleChronicleQueueBuilder.doubleBuffer(true);
筆記 | 在緩衝的寫入期間, DocumentContext.index() 將拋出IndexNotAvailableException 。這是因為不可能知道索引,直到緩衝區寫回隊列為止,這種情況只有在DocumentContext 關閉時才會發生。 |
這僅在(大多數)寫入隊列的對象足夠大,並且它們的編組不是直接的(例如Bytesmarshallable的編排非常有效且快速,因此雙重持久的事情只會放慢速度),並且寫作有很大的爭論(例如2或更多線程以很高的速度將大量數據寫入隊列)。
結果:
以下是繁瑣消息的各種數據尺寸的基準結果(請參閱net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
),YMMV-始終執行您自己的基準:
1 kb
雙向籃板禁用:
-------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ------------------------ -------------------------- ---------------------------------------- 百分比run1 run2 run3%變化 50:90.40 90.59 91.17 0.42 90:179.52 180.29 97.50 36.14 99:187.33 186.69 186.82 0.05 99.7:213.57 198.72 217.28 5.86 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- ------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------- ------------------------------------------ ---------------------------- 百分比run1 run2 run3%變化 50:179.14 179.26 180.93 0.62 90:183.49 183.36 185.92 0.92 99:192.19 190.02 215.49 8.20 99.7:240.70 228.16 258.88 8.24 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- -------------------
啟用了雙擋板:
-------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ------------------------ -------------------------- ---------------------------------------- 百分比run1 run2 run3%變化 50:86.05 85.60 86.24 0.50 90:170.18 169.79 170.30 0.20 99:176.83 176.58 177.09 0.19 99.7:183.36 185.92 183.49 0.88 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- ------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------- ------------------------------------------ ---------------------------- 百分比run1 run2 run3%變化 50:86.24 85.98 86.11 0.10 90:89.89 89.44 89.63 0.14 99:169.66 169.79 170.05 0.10 99.7:175.42 176.32 176.45 0.05 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- -------------------
4 KB
雙向籃板禁用:
-------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ------------------------ -------------------------- ---------------------------------------- 百分比run1 run2 run3%變化 50:691.46 699.65 701.18 0.15 90:717.57 722.69 721.15 0.14 99:752.90 748.29 748.29 0.00 99.7:1872.38 1743.36 1780.22 1.39 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- ------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------- ------------------------------------------ ---------------------------- 百分比run1 run2 run3%變化 50:350.59 353.66 353.41 0.05 90:691.46 701.18 697.60 0.34 99:732.42 733.95 729.34 0.42 99.7:1377.79 1279.49 1302.02 1.16 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- -------------------
啟用了雙擋板:
-------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ------------------------ -------------------------- ---------------------------------------- 百分比run1 run2 run3%變化 50:342.40 344.96 344.45 0.10 90:357.25 360.32 359.04 0.24 99:688.38 691.97 691.46 0.05 99.7:1376.77 1480.19 1383.94 4.43 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- ------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- -------- ------------------------------------------ ---------------------------- 百分比run1 run2 run3%變化 50:343.68 345.47 346.24 0.15 90:360.06 362.11 363.14 0.19 99:694.02 698.62 699.14 0.05 99.7:1400.32 1510.91 1435.14 3.40 -------------------------------------------------- --------------------------- ----------------------- -------------------------------------------------- ---- -------------------
如果您想調整代碼以獲取超低延遲,則可以採用類似的QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
該代碼可以被視為基本的堆棧採樣器參考器。這是假設您將代碼基於net.openhft.chronicle.core.threads.EventLoop
,您可以定期採樣堆棧以查找攤位。建議不要將樣本率降低到50微秒以下,因為這會產生太多的噪聲
它可能會給您帶來比典型的分析師更好的粒度。由於它基於攤位所在位置的統計方法,因此需要許多樣本,以查看哪個代碼的分組最高(換句話說是最高的攤位),並且會輸出以下看起來如下的跟踪:
28在java.util.concurrent.concurrenthashmap.putval(concurrenthashmap.java:1012) 在java.util.concurrent.concurrenthashmap.put(concurrenthashmap.java:1006) 在net.openhft.chronicle.core.util.util.weakreferencecleaner.newcleaner(弱referencecleaner.java:43) 在net.openhft.chronicle.bytes.nativebytesstore。 在net.openhft.chronicle.bytes.mappedbytesstore。 在net.openhft.chronicle.bytes.mappedfile $$ lambda $ 4/1732398722.CREATE(未知來源) 在net.openhft.chronicle.bytes.mappedfile.acquirebytestore(mappedfile.java:297) 在net.openhft.chronicle.bytes.mappedfile.acquirebytestore(mappedfile.java:246) 25在net.openhft.chronicle.queue.jitter.queuewritejittermain.lambda $ main $ 1(queuewritejittermain.java:58) at net.openhft.chronicle.queue.jitter.queuewritejittermain $$ lambda $ 11/967627249.lun(未知來源) 在java.lang.thread.run(thread.java:748) 21在java.util.concurrent.concurrenthashmap.putval(concurrenthashmap.java:1027) 在java.util.concurrent.concurrenthashmap.put(concurrenthashmap.java:1006) 在net.openhft.chronicle.core.util.util.weakreferencecleaner.newcleaner(弱referencecleaner.java:43) 在net.openhft.chronicle.bytes.nativebytesstore。 在net.openhft.chronicle.bytes.mappedbytesstore。 在net.openhft.chronicle.bytes.mappedfile $$ lambda $ 4/1732398722.CREATE(未知來源) 在net.openhft.chronicle.bytes.mappedfile.acquirebytestore(mappedfile.java:297) 在net.openhft.chronicle.bytes.mappedfile.acquirebytestore(mappedfile.java:246) 14 at net.openhft.chronicle.queue.jitter.queuewritejittermain.lambda $ main $ 1(queuewritejittermain.java:54) at net.openhft.chronicle.queue.jitter.queuewritejittermain $$ lambda $ 11/967627249.lun(未知來源) 在java.lang.thread.run(thread.java:748)
從中,我們可以看到,大多數樣品(在此情況下28個樣本)在ConcurrentHashMap.putVal()
中被捕獲,如果我們希望獲得更細的穀物粒度,我們經常會添加net.openhft.chronicle.core.Jvm.safepoint
結果:
在上述測試中,典型的延遲在14至40微秒之間變化。 99個百分位數在17至56微秒之間變化,具體取決於要測試的吞吐量。值得注意的是,99.93%的延遲在21微秒至41毫秒之間變化,為2000倍。
可接受的延遲 | 吞吐量 |
<30微秒99.3% | 每秒700萬個消息 |
<20微秒99.9% | 每秒2000萬條消息 |
<1毫秒99.9% | 每秒5000萬條消息 |
<60微秒99.3% | 每秒8000萬個消息 |
批處理和隊列延遲
各種消息大小的端到端延遲圖
編年史的旨在超越其競爭對手,例如Kafka。與Apache Kafka相比,編年史隊列在更大的吞吐量和較低的潛伏期的含量下支持。儘管卡夫卡(Kafka)比許多替代方案都要快,但它與編年史隊(Chronicle Queue)支持每秒超過一百萬個事件的吞吐量的能力不符,而同時達到1至20微秒的潛伏期。
編年史隊列從單個線程到單個分區處理更多音量。這避免了對分區的複雜性和缺點的需求。
Kafka使用中級經紀人使用操作系統的文件系統和緩存,而Chronicle隊列直接使用操作系統的文件系統和緩存。有關比較,請參見Kafka文檔
大數據和編年史隊列 - 紀事隊使用的某些技術的詳細描述
常見問題 - 客戶問的問題
它的工作原理 - 對如何實施編年史的更深度
實用程序 - 列出了一些有用的公用事業,用於使用隊列文件
在Stackoverflow上支撐編年史
Google組的編年史支持
留下您的電子郵件,以獲取有關最新版本和補丁的信息,以保持最新狀態。