https://player.vimeo.com/video/201989439
编年史队列是用于高性能应用程序的持续存在的低延迟消息框架。
该项目涵盖了编年史的Java版本。该项目的C ++版本也可用,并支持Java/C ++互操作性以及其他语言绑定,例如Python。如果您有兴趣评估C ++版本,请联系[email protected]。
乍一看,纪事队的队列可以被视为另一个队列实现。但是,它具有应强调的主要设计选择。使用非主存储,Chronicle队列提供了一个环境,该环境不受垃圾收集(GC)的困扰。在Java中实施高性能和内存密集型应用程序(您听到了花哨的术语“ BigData”?)时,最大的问题之一就是垃圾收集。
编年史队列允许将消息添加到队列的末尾(“附加”),从队列中读取(“尾巴”),还支持随机访问搜索。
您可以将编年史队列类似于低潜伏期经纪人耐用/持续存在的主题,该主题可以包含不同类型和尺寸的消息。编年史队列是一个分布式无界的持续队列,该队列:
支持异步RMI并发布/订阅微秒潜伏期的接口。
在微秒内传递JVM之间的消息
通过10微秒以下的复制(企业功能)在不同机器上的JVM之间传递消息
为一个队列提供一个稳定的,柔软的实时延迟到每秒数百万个消息;每个事件的总订购。
在发布40个字节消息时,我们在1微秒以下达到潜伏期的时间很高。第99个百分位延迟是100分之一的最差1,第99.9个百分位数是1000个延迟中最差的1个。
批量大小 | 每分钟1000万事件 | 每分钟6000万事件 | 每分钟1亿个活动 |
---|---|---|---|
99%的ILE | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%的ILE | 1.2 µs | 1.3 µs | 1.5 µs |
批量大小 | 每分钟1000万事件 | 每分钟6000万事件 | 每分钟1亿个活动 |
---|---|---|---|
99%的ILE | 20 µs | 28 µs | 176 µs |
99.9%的ILE | 901 µs | 705 µs | 5,370 µs |
笔记 | 每分钟1亿个活动每660纳秒发出活动;复制和持续。 |
重要的 | 使用大型机器无法实现此性能。这是使用一个线程发布的,而一个线程要消耗。 |
编年史队的设计为:
成为可以使用微秒实时延迟阅读的“记录所有商店”。这甚至支持最苛刻的高频交易系统。但是,它可以在信息记录是一个问题的任何应用程序中使用。
在成功复制消息时,支持可靠的复制,并通知appender(消息的作者)或裁缝(消息的读者)。
编年史队列假设与内存相比,磁盘空间便宜。编年史队列充分利用您拥有的磁盘空间,因此您不受计算机的主要内存限制。如果您使用旋转HDD,则可以以几乎没有成本存储许多TBS的磁盘空间。
Chronicle队列需要运行的唯一额外软件是操作系统。它没有经纪人;相反,它使用您的操作系统来完成所有工作。如果您的应用程序死亡,则操作系统的运行持续更长的时间,因此不会丢失数据;即使没有复制。
由于Chronicle队列将所有保存的数据存储在内存映射的文件中,即使您拥有超过100 TB的数据,也有一个微不足道的架子开销。
编年史为达到非常低的延迟而付出了巨大的努力。在其他关注Web应用程序支持的产品中,少于40毫秒的潜伏期比您看到的要快得多。例如,电影帧速率为24 Hz,或大约40毫秒。
编年史的旨在以99%至99.99%的时间达到40微秒以下的潜伏期。使用编年史队列而无需复制,我们支持在多个服务的端到端低于40微秒的潜伏期的应用程序。纪事排队的99%延迟通常完全取决于操作系统和硬盘子系统的选择。
编年史队列的复制支持编年史企业。这支持实时压缩,该压缩计算单个对象的三角洲。这可以将消息的大小减少10倍或更高,而无需批处理;也就是说,没有引入大量延迟。
编年史队列还支持LZW,Snappy和GZIP压缩。但是,这些格式增加了明显的延迟。这些仅在您对网络带宽有严格的限制时才有用。
编年史队列支持许多语义:
每个消息都在重新启动时重播。
重新启动时仅播放新消息。
使用条目的索引从任何已知点重新启动。
仅重播您错过的消息。使用MethodReader/MethodWriter构建器直接支持这一点。
在大多数系统系统上, System.nanoTime()
大约是系统上一次重新启动以来的纳秒数(尽管不同的JVM的行为可能有所不同)。在同一台计算机上的JVM之间是相同的,但在机器之间却大不相同。关于机器的绝对差异是毫无意义的。但是,该信息可用于检测异常值。您无法确定最好的延迟是什么,但是您可以确定距离最佳潜伏期的距离。如果您专注于第99%的延迟,这将很有用。我们有一个名为RunningMinimum
的类,可以从不同的机器中获取时间,同时补偿了机器之间的nanoTime
漂移。您进行测量的频率越多,该运行最小值的准确性就越准确。
编年史队列通过周期管理存储。您可以添加一个StoreFileListener
该商店将在添加文件时以及不再保留文件时通知您。您可以一次移动,压缩或删除所有消息。注意:不幸的是,在Windows上,如果IO操作被中断,则可以关闭基础Filechannel。
由于表现原因,我们已删除了编年史列代码中的中断检查。因此,我们建议您避免将编年史队列与生成中断的代码使用。如果您无法避免生成中断,那么我们建议您创建一个单独的每个线程编年史的实例。
Chronicle队列通常用于以生产者为中心的系统,您需要在几天或几年内保留大量数据。有关统计信息,请参见纪事的使用情况
重要的 | Chronicle队列不支持在任何网络文件系统中运行,无论是NFS,AFS,基于SAN的存储还是其他任何东西。这样做的原因是这些文件系统没有为记忆映射文件的所有必需的原始纪录提供编年史的使用。如果需要任何网络(例如,使数据可访问多个主机),则唯一支持的方法是编年史队列复制(企业功能)。 |
大多数消息传递系统以消费者为中心。实施流量控制是为了避免消费者曾经超负荷;甚至暂时。一个常见的示例是支持多个GUI用户的服务器。这些用户可能会在不同的机器(OS和硬件),网络的不同质量(延迟和带宽)上,在不同的时间做其他事情。因此,客户消费者告诉生产者何时退缩是有意义的,延迟任何数据,直到消费者准备获取更多数据。
Chronicle队列是以生产者为中心的解决方案,可以尽一切可能永远不会推迟生产者,或者告诉它放慢速度。这使其成为一个强大的工具,可在系统之间提供一个很大的缓冲区,以及您几乎无法控制的上游生产商。
市场数据出版商不会让您选择长期推荐生产商。如果有的话。我们的一些用户从CME OPRA消耗数据。这会产生每分钟1000万次活动的峰值,以UDP数据包发送而无需重试。如果您错过或放下数据包,那么它会丢失。您必须尽快消耗并记录这些数据包,而网络适配器中的缓冲很少。特别是对于市场数据,实时意味着在几微秒内;这并不意味着日内(白天)。
编年史队列效率快,有效,已用于提高线程之间数据传递的速度。此外,它还保留了每条传递的每条消息的记录,从而使您可以大大减少需要执行的记录量。
如今,越来越多的系统需要合规系统。每个人都必须拥有它们,但是没有人愿意被他们放慢脚步。通过使用编年史队列在监视系统和合规系统之间进行缓冲数据,您无需担心合规性记录对监视系统的影响。同样,编年史队列可以支持数年保留多年的每秒,每服务器和访问数据的数百万事件。
编年史队列以1微秒为单位的数量级,支持同一机器上JVM之间的低延迟IPC(相互处理交流);以及在典型的延迟时间为10微秒的机器之间,适度的吞吐量为数十万。编年史队列支持每秒数百万事件的吞吐量,并具有稳定的微秒延迟。
请参阅有关在微服务中使用纪事排队的文章
编年史可以用来建造状态机。有关这些组件状态的所有信息都可以在外部复制,而无需直接访问组件或其状态。这大大减少了对额外记录的需求。但是,您确实需要的任何记录都可以详细记录。这使得启用生产实用的DEBUG
记录。这是因为记录的成本非常低。小于10微秒。可以将日志集中复制以进行日志整合。编年史队列用于存储100多个TB数据,可以从任何时间点重播。
非批次流组件具有高度性能,确定性和可再现的。您可以复制仅在以特定顺序进行的一百万个事件出现并加速逼真的时间之后出现的错误。这使得使用流处理对需要高质量质量结果的系统有吸引力。
在Maven Central上可以使用:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
请参阅编年史队列发行说明并获取最新版本号。快照可在https://oss.sonatype.org上找到
笔记 | 驻留在“内部”包中的任何一个,“ impl”和“ main”(后者包含各种可运行的主要方法)和任何子包装的类中都不是公共API的一部分,并且可能会在任何情况下更改出于任何原因的时间。有关详细信息,请参见相应的package-info.java 文件。 |
现在,在编年史队的V5裁缝中,只读的是,在编年史队列V4中,我们具有懒惰索引的概念,在这里,附录不会编写索引,而是可以由尾声完成索引。我们决定在V5中放下懒惰的索引;使裁缝只读不仅简化了编年史队列,而且还允许我们在代码其他地方添加优化。
v5中更改了编年史队列的锁定模型,在.cq4文件中存在写入锁(以防止并发写入队列)。在V5中,这移至一个名为Table Store(Metadata.cq4t)的单个文件。这简化了内部锁定代码,因为只能检查表存储文件。
您可以使用Chronicle队列V5读取使用Chronicle队列V4编写的消息,但这不能始终可以始终工作 - 例如,如果您使用wireType(WireType.FIELDLESS_BINARY)
创建了V4队列,那么Chronicle decorele decornle dechornicle decor v5将无法阅读队列的标题。我们对V5阅读V4队列的测试进行了一些测试,但这些测试是有限的,所有场景可能不支持。
您不能使用编年史队列V5写信给编年史队列V4队列。
纪事排队V4是编年史队列的完整重写,该编织队列解决了V3中存在的以下问题。
如果没有自描述消息,用户就必须创建自己的功能来倾倒消息和长期存储数据。使用V4,您不必这样做,但是如果您愿意的话,您可以。
Vanilla Chronicle队列将每个线程创建一个文件。如果控制螺纹的数量,这很好,但是,许多应用程序对使用多少线程几乎没有控制,这会导致可用性问题。
索引和香草纪事的配置完全是代码中的,因此读者必须具有与作家相同的配置,并且并不总是清楚那是什么。
生产商无法知道第二台机器已将多少数据复制到了。唯一的解决方法是将数据复制回生产商。
您需要在开始写消息之前指定要保留的数据大小。
使用索引编年史时,您需要自己锁定自己的锁定。
在编年史队列V3中,一切都是按字节而不是电线表示的。在编年史队列V4中使用字节有两种方法。您可以使用writeBytes
和readBytes
方法,也可以从电线中获取bytes()
。例如:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle队列企业版是我们成功的开源编年史队列的商业支持版本。以下文档扩展了开源文档,以描述您获得企业版许可时可用的其他功能。这些都是:
消息队列和消息的加密。有关更多信息,请参见加密文档。
主机之间的TCP/IP(以及可选的UDP)复制,以确保所有队列数据的实时备份。有关更多信息,请参见复制文档,以复制协议介绍了队列复制协议。
时区支持每日队列翻车时间表。有关更多信息,请参见时区支持。
异步模式支持以在较慢的文件系统上提供高吞吐量的改进性能。有关更多信息,请参见异步模式以及性能。
改进异常值的预触摸器,请参阅预打击器及其配置
此外,我们的技术专家将为您提供充分的支持。
有关Chronicle队列企业版的更多信息,请联系[email protected]。
编年史队列由旨在支持的SingleChronicleQueue.class
典型定义:
每天,每周或每小时滚动文件,
同一台机器上的并发作家,
通过TCP复制(使用Chronicle Queue Enterprise),同一台机器上或跨多个计算机上的同时读取器,
Docker或其他容器的工作负载之间的同时读者和作家
零复制序列化和避免化,
数以百万计的商品硬件每秒写入/读取。
i7-4790处理器上的96个字节消息约为500万条消息/秒。队列目录结构如下:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
该格式由使用BinaryWire
或TextWire
格式的大小预定字节组成。编年史队的设计旨在从代码中驱动。您可以轻松添加适合您需求的接口。
笔记 | 由于相当低的操作,编年史队列读/写操作可能会引发不受限制的例外。为了防止线程死亡,可以捕获RuntimeExceptions 并将其记录/分析它们可能是实际的。 |
笔记 | 有关如何使用编年史队列的演示 |
在以下各节中,首先,我们介绍了一些术语和快速参考,以使用编年史队列。然后,我们提供了更详细的指南。
Chronicle队列是一本持续存在的消息杂志,即使在同一台计算机上的多个JVM中,也支持并发作家和读者。每个读者都会看到每一个消息,读者可以随时加入,仍然看到每个消息。
笔记 | 我们故意避免使用消费者一词,而是使用阅读器,因为阅读不会消耗/破坏消息。 |
编年史队列具有以下主要概念:
摘抄
摘录是编年史队列中的主要数据容器。换句话说,每个编年史队列都由摘录组成。将消息写给编年史队列意味着开始新的摘录,将消息写入其中以及在最后完成摘录。
appender
附录是消息的来源;像编年史环境中的迭代器一样。您添加附加当前编年史队列的数据。它可以通过附加到队列结束来执行顺序写入。没有办法插入或删除摘录。
裁缝
裁缝是一位优化用于顺序读取的读者。它可以执行前进和向后进行顺序和随机读取。每次被调用时,定订者都会阅读下一条可用的消息。在编年史中保证以下内容:
对于每个附录,邮件是按照附录编写的顺序编写的。不同附录的消息交错,
对于每个裁缝,它将看到一个主题的每条消息,以与其他所有裁缝相同的顺序,
复制后,每个复制品都有每个消息的副本。
编年史队列不再是经纪人。如果您需要经纪人的架构,请联系[email protected]。
文件滚动和队列文件
Chronicle队列旨在根据创建队列时选择的滚动周期滚动文件(请参阅Rollcycles)。换句话说,为具有扩展名cq4
的每个滚动周期创建一个队列文件。当滚动周期达到应该滚动的点时,Appender将在当前文件的末尾原子写EOF
标记,以表明没有其他Appender不得写入此文件,并且不得进一步读取器,而每个人都应该使用新文件。
如果该过程被关闭,并在滚动周期应使用新文件时稍后重新启动,则Appender会尝试找到旧文件并在其中编写EOF
标记,以帮助裁缝读取它们。
主题
每个主题都是队列文件的目录。如果您有一个称为mytopic
的主题,那么布局可能会如下:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
要复制一天(或循环)的所有数据,您可以将当天的文件复制到开发计算机进行重播测试。
对主题和消息的限制
主题仅限于可以用作目录名称的字符串。在一个主题中,您可以拥有可以序列化的任何数据类型的子主题。消息可以是任何可序列化数据。
编年史队列支持:
Serializable
对象,尽管这是不高效的,但要避免
如果您希望使用标准Java API,则首选Externalizable
对象。
byte[]
和String
Marshallable
;一个可以写为yaml,二进制YAML或JSON的自我描述的信息。
BytesMarshallable
,是低级二进制或文本编码的。
本节提供了快速参考,用于使用编年史队列简要显示如何创建,从队列中进行/读取/阅读。
编年史队列结构
创建编年史队列的实例与仅调用构造函数不同。要创建一个实例,您必须使用ChronicleQueueBuilder
。
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
在此示例中,我们创建了一个IndexedChronicle
,该索引创建了两个RandomAccessFiles
。一个用于索引,一个用于具有相对名称的数据:
${java.io.tmpdir}/getting-started/{today}.cq4
写入队列
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
从队列阅读
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
另外, ChronicleQueue.dump()
方法可用于将原始内容倒入字符串。
queue . dump ();
清理
Chronicle队列将其数据放在外围,建议您在使用Chronicle队列工作后调用close()
以免费资源。
笔记 | 如果这样做,不会丢失数据。这只是清理所使用的资源。 |
queue . close ();
将所有这些放在一起
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
您可以使用其配置参数或系统属性来配置编年史队。此外,从诸如代理的使用以及使用MethodReader
和MethodWriter
等队列中写入/阅读的方式有不同的方式。
编年史队列(CQ)可以通过SingleChronicleQueueBuilder
类上的多种方法进行配置。下面解释了我们客户最查询的一些参数。
滚动
RollCycle
参数配置CQ将滚动基础队列文件的速率。例如,使用以下代码段将导致每小时滚动的队列文件(即创建的新文件):
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
一旦设置了队列的滚动循环,就无法在以后更改。配置为使用相同路径的SingleChronicleQueue
的任何其他实例都应配置为使用相同的滚动循环,如果不是,则将更新滚动循环以匹配持久的滚动周期。在这种情况下,将打印一条警告日志消息,以将情况通知库用户:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
控制台输出:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
可以在队列文件中存储的最大消息数取决于滚动周期。有关此信息的更多信息,请参见常见问题解答。
在编年史队列中,翻转时间基于UTC。时区翻转企业功能扩展了编年史队列的能力,可以指定队列滚动的时间和周期性,而不是UTC。有关更多信息,请参见时区队列翻车。
编年史的FileUtil
类提供了管理队列文件的有用方法。请参阅直接管理滚动文件。
窃听
可以通过明确设置WireType
来配置编年史队列如何存储数据:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
例如:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
尽管在创建构建器时可以显式提供窃听,但由于并非所有电线类型都由编年史队支持。特别是,不支持以下电线类型:
文本(本质上都是基于文本,包括JSON和CSV)
生的
read_any
阻止
当读取/书写队列时,当前正在读取/书面的文件的一部分将映射到内存段。此参数控制内存映射块的大小。您可以使用方法SingleChronicleQueueBuilder.blockSize(long blockSize)
来更改此参数。
笔记 | 您应该避免不必要地更换blockSize 。 |
如果您要发送大消息,则应设置一个blockSize
,即blockSize
至少是消息大小的四倍。
警告 | 如果您使用小型blockSize 用于大邮件,则会收到IllegalStateException 并且写入中止。 |
我们建议您在复制队列时对每个队列实例使用相同的blockSize
,而不会写入队列的元blockSize
,因此在创建纪事排队的实例时,理想情况下应设置为相同的值(如果您愿意,但是如果您希望使用要使用不同的blocksize
运行)。
提示 | 对于复制的队列的每个实例,使用相同的blockSize 。 |
索引空间
此参数显示了明确索引的摘录之间的空间。较高的数字意味着更高的顺序写入性能,但随机访问读数较慢。顺序读取性能不受此属性的影响。例如,可以返回以下默认索引间距:
16(细微)
64(每日)
您可以使用SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
更改此参数。
索引
每个索引数组的大小以及每个队列文件的索引数组的总数。
笔记 | 索引2是索引队列条目的最大数量。 |
笔记 | 有关更多信息和使用索引的示例,请参见本用户指南编年史中的摘要索引。 |
deadbuffermode,writebuffermode
这些参数定义了具有以下选项的读取或写入的buffersode:
None
- 默认值(也是唯一可用于开源用户的一个),没有缓冲;
Copy
- 与加密结合使用;
Asynchronous
- 在阅读和/或写作时使用异步缓冲液,由编年史异步模式提供。
缓冲
使用bufferMode: Asynchronous
在编年史队列中,我们将您的数据写入编年史队列的行为是存储摘录的。这些数据可以由任何数据类型组成,包括文本,数字或序列化斑点。最终,您的所有数据,无论它是什么数据,都被存储为一系列字节。
在存储摘录之前,Chronicle队列保留了一个4字节的标题。编年史队列将数据的长度写入此标题。这样,当Chronicle队列阅读您的摘录时,它就会知道每个数据斑点有多长时间。我们将此4个字节标头以及您的摘录称为文档。严格地说,纪事的队列可用于读写文档。
笔记 | 在这个4个字节的标头中,我们还为许多内部操作(例如锁定)保留了一些位,以使纪事排队螺纹在处理器和线程之间进行纪录队列线程安全。要注意的重要一点是,因此,您无法严格地将4个字节转换为整数以找到数据斑点的长度。 |
如前所述,Chronicle队列使用Appender写信给队列,并从队列中读取尾声。与其他Java排队解决方案不同,随着裁缝的阅读,消息不会丢失。在下面的“使用尾声从队列阅读”的部分中,这是更详细的详细信息。要将数据写入编年史队列,您必须首先创建一个appender:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
编年史队列使用以下低级接口来编写数据:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
try-with-resources上的近距离是将数据的长度写入标题的重点。您还可以使用DocumentContext
来找出刚刚分配了数据的索引(请参见下文)。您稍后可以使用此索引移动/查找此摘录。每个编年史队列摘录都有一个唯一的索引。
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
下面的高级方法(例如writeText()
是调用appender.writingDocument()
便利方法,但两种方法基本上都在做同样的事情。 writeText(CharSequence text)
的实际代码如下:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
因此,您可以选择多个高级接口,直至低水平的API到原始内存。
这是最高级别的API,它隐藏了您所写的事实。好处是,您可以将调用与真实组件或接口将接口交换为其他协议。
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
您可以写一个“自我描述消息”。这样的消息可以支持模式更改。在调试或诊断问题时,它们也更容易理解。
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
您可以编写自我描述的“原始数据”。类型始终是正确的;位置是这些值的含义的唯一迹象。
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
您可以编写不是自称的“原始数据”。您的读者必须知道此数据的含义以及所使用的类型。
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
下面说明了写数据的最低级别的方式。您可以获得原始内存的地址,并且可以编写任何想要的内容。
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
您可以打印队列的内容。您可以看到前两个,最后两个消息存储相同的数据。
// dump the content of the queue
System . out . println ( queue . dump ());
印刷:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
阅读队列与写作相同的模式遵循,除了您尝试阅读时可能没有消息。
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
您可以根据消息的内容将每个消息转换为一个方法调用,并使编年史队列自动对该方法参数进行挑选。调用reader.readOne()
将自动跳过(过滤掉)任何与方法读取器不匹配的消息。
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
您可以自己解码消息。
笔记 | 字段的名称,类型和顺序不必匹配。 |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
您可以读取自我描述的数据值。这将检查类型正确,并根据需要进行转换。
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
您可以将原始数据读取为原始数据和字符串。
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
或者,您可以获取基础内存地址并访问本机内存。
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
笔记 | 每个裁缝都会看到每个消息。 |
可以将抽象添加到过滤消息中,也可以将消息分配给一个消息处理器。但是,总的来说,您只需要一个主题的主裁缝,可能还有一些支持监视的裁缝。
由于Chronicle队列没有划分其主题,因此您可以在该主题中获得所有消息的总订购。在整个主题中,没有保证订购;如果您想从多个主题中消耗的系统确定性重播,我们建议从该系统的输出中重播。
编年史队列裁缝可能会创建文件处理程序,只要调用相关编年史的close()
方法,或者每当JVM运行垃圾集合时,就会清理文件处理程序。如果您编写的代码没有GC停顿,并且明确想清理文件处理程序,则可以调用以下内容:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
在某些应用中,可能有必要从队列结束开始阅读(例如,在重新启动方案中)。对于此用例, ExcerptTailer
提供了toEnd()
方法。当尾声方向FORWARD
(默认情况下,或按ExcerptTailer.direction
方法设置)时,呼叫toEnd()
将在队列中的最后一个现有记录之后放置尾声。在这种情况下,Tailer现在准备好阅读附加到队列的任何新记录。在将任何新消息附加到队列上之前,将没有可供阅读的新DocumentContext
:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
如果有必要从末端从队列中向后读取,则可以将尾巴固定为向后读取:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
向后读取时, toEnd()
方法将将尾声移至队列中的最后一个记录。如果队列不是空的,则将有一个DocumentContext
可供阅读:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
又名名为定的人。
拥有一个尾巴,该裁缝可以从该应用程序的重新启动时继续进行。
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
尾声“ a”最后读消息2
尾声“ a”下一个读取消息3
尾声“ B”最后读消息0
尾声“ B”下一步读取消息1
这是来自RestartableTailerTest
那里有两个裁缝,每个裁缝都有一个唯一的名称。 These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
笔记 | The direction() is not preserved across restarts, only the next index to be read. |
笔记 | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
提示 | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
笔记 | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age;私有字符串名称; }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
或者
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
笔记 | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property.默认值为0。
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
警告 | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
笔记 | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
笔记 | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
笔记 | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
结果:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 kb
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ------------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- ------------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- -------------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ------------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- ------------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- -------------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ------------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- ------------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- -------------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ------------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- ------------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 ----------------------------------------------------------------------------- ----------------------------------------------------------------------------- -------------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) 在java.lang.thread.run(thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) 在java.lang.thread.run(thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
结果:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | 吞吐量 |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.