https://player.vimeo.com/video/201989439
Queue Chronicle هو إطار مراسلة منخفضة الكلية للتطبيقات عالية الأداء.
يغطي هذا المشروع إصدار Java من قائمة انتظار Chronicle. يتوفر إصدار C ++ من هذا المشروع أيضًا ويدعم قابلية التشغيل البيني Java/C ++ بالإضافة إلى روابط لغة إضافية مثل Python. إذا كنت مهتمًا بتقييم إصدار C ++ ، فيرجى الاتصال بـ [email protected].
للوهلة الأولى ، يمكن اعتبار قائمة انتظار Chronicle على أنها مجرد تطبيق قائمة انتظار أخرى . ومع ذلك ، فإنه يحتوي على خيارات تصميم رئيسية يجب التأكيد عليها. باستخدام التخزين خارج الربع ، يوفر Chronicle Queue بيئة حيث لا تعاني التطبيقات من جمع القمامة (GC). عند تنفيذ تطبيقات عالية الأداء وذاكرة (سمعت المصطلح الهائل "BigData"؟) في Java ، واحدة من أكبر المشكلات هي مجموعة القمامة.
يتيح قائمة انتظار Chronicle إضافة الرسائل إلى نهاية قائمة الانتظار ("الملحقة") ، وقراءة من قائمة الانتظار ("Tailed") ، ويدعم أيضًا Seek Random-Access.
يمكنك النظر في قائمة انتظار Chronicle لتكون مشابهة للموضوع المتين/المستمر منخفض الكمون الذي يمكن أن يحتوي على رسائل من أنواع وأحجام مختلفة. قائمة انتظار Chronicle عبارة عن قائمة انتظار مستمرة غير محدودة موزعة:
يدعم RMI غير المتزامن ونشر/اشتراك واجهات مع زمنات زمنية microsecond.
يمرر الرسائل بين JVMs في أسفل microsecond
يمرر الرسائل بين JVMs على أجهزة مختلفة عن طريق النسخ المتماثل في أقل من 10 ميكروثانية (ميزة المؤسسة)
يوفر زمنات زمنية مستقرة وناعمة في الوقت الفعلي في ملايين الرسائل في الثانية لخيط واحد إلى قائمة انتظار واحدة ؛ مع الترتيب الكلي لكل حدث.
عند نشر رسائل 40 بايت ، نسبة مئوية عالية من الوقت الذي نحقق فيه زمنات زمنية أقل من 1 microsecond. يعد الكمون المئوي 99 هو الأسوأ 1 في 100 ، والنسبة المئوية 99.9 هي الأسوأ 1 في 1000 زمن انتقال.
حجم الدُفعة | 10 مليون حدث في الدقيقة | 60 مليون حدث في الدقيقة | 100 مليون حدث في الدقيقة |
---|---|---|---|
99 ٪ الدقاق | 0.78 µs | 0.78 µs | 1.2 µs |
99.9 ٪ Ile | 1.2 µs | 1.3 µs | 1.5 µs |
حجم الدُفعة | 10 مليون حدث في الدقيقة | 60 مليون حدث في الدقيقة | 100 مليون حدث في الدقيقة |
---|---|---|---|
99 ٪ الدقاق | 20 µs | 28 µs | 176 µs |
99.9 ٪ Ile | 901 µs | 705 µs | 5،370 µs |
ملحوظة | 100 مليون حدث في الدقيقة ترسل حدثًا كل 660 نانو ثانية ؛ نسخها واستمرت. |
مهم | لا يتحقق هذا الأداء باستخدام مجموعة كبيرة من الآلات . هذا يستخدم مؤشر ترابط واحد للنشر ، وخيط واحد للاستهلاك. |
تم تصميم قائمة انتظار Chronicle إلى:
كن "متجر كل شيء" يمكنه القراءة مع زمن انتقال الوقت الفعلي. هذا يدعم حتى أنظمة تداول التردد العالية الأكثر تطلبًا. ومع ذلك ، يمكن استخدامه في أي تطبيق حيث يكون تسجيل المعلومات مصدر قلق.
دعم النسخ المتماثل الموثوق مع الإخطار إما لجدندر (كاتب الرسالة) أو خيمة (قارئ الرسالة) ، عندما يتم تكرار رسالة بنجاح.
Queue Chronicle يفترض أن مساحة القرص رخيصة مقارنة بالذاكرة. يستفيد Queue Chronicle بالكامل لمساحة القرص التي لديك ، وبالتالي فأنت لا تقتصر على الذاكرة الرئيسية لجهازك. إذا كنت تستخدم HDD الغزل ، فيمكنك تخزين العديد من TBS من مساحة القرص مقابل تكلفة قليلة.
البرنامج الإضافي الوحيد الذي يحتاجه قائمة انتظار Chronicle هو تشغيل نظام التشغيل. ليس لديه وسيط. بدلاً من ذلك ، يستخدم نظام التشغيل الخاص بك للقيام بكل العمل. إذا مات تطبيقك ، يستمر نظام التشغيل في العمل لثاني ثوانٍ ، لذلك لا يتم فقد أي بيانات ؛ حتى بدون تكرار.
نظرًا لأن قائمة انتظار Chronicle تخزن جميع البيانات المحفوظة في الملفات التي تم تحريكها للذاكرة ، فإن هذا يحتوي على علبة تافهة على رأس الرب ، حتى لو كان لديك أكثر من 100 تيرابايت من البيانات.
بذل كرونيكل جهدًا كبيرًا في تحقيق زمن انتقال منخفض للغاية. في المنتجات الأخرى التي تركز على دعم تطبيقات الويب ، فإن زمن التقدم الذي يقل عن 40 مللي ثانية على ما يرام لأنها أسرع مما ترون ؛ على سبيل المثال ، يبلغ معدل إطار السينما 24 هرتز ، أو حوالي 40 مللي ثانية.
يهدف Chronicle Queue إلى تحقيق اختفاء أقل من 40 ميكروثانية من 99 ٪ إلى 99.99 ٪ من الوقت. باستخدام قائمة انتظار Chronicle بدون تكرار ، فإننا ندعم التطبيقات مع زمنات زمنية أقل من 40 microseconds من طرف إلى طرف عبر خدمات متعددة. غالبًا ما يعتمد زمن انتقال قائمة انتظار Chronicle Carinue بالكامل على اختيار نظام التشغيل والنظام الفرعي للقرص الثابت.
يدعم النسخ المتماثل لقائم انتظار Chronicle Chronicle Wire Enterprise. هذا يدعم ضغط في الوقت الفعلي الذي يحسب الدلتا للكائنات الفردية ، كما هو مكتوب. هذا يمكن أن يقلل من حجم الرسائل بعامل 10 ، أو أفضل ، دون الحاجة إلى التضمين ؛ وهذا هو ، دون تقديم زمن انتقال كبير.
يدعم قائمة انتظار Chronicle أيضًا ضغط LZW و Snappy و GZIP. هذه التنسيقات ولكن تضيف زمن انتقال كبير. هذه مفيدة فقط إذا كان لديك قيود صارمة على عرض النطاق الترددي للشبكة.
يدعم قائمة انتظار Chronicle عددًا من الدلالات:
يتم إعادة تشغيل كل رسالة على إعادة التشغيل.
يتم تشغيل الرسائل الجديدة فقط على إعادة التشغيل.
أعد التشغيل من أي نقطة معروفة باستخدام فهرس الإدخال.
إعادة تشغيل الرسائل التي فاتتها فقط. يتم دعم هذا مباشرة باستخدام MethentReader/MethodWriter Builders.
على معظم أنظمة الأنظمة System.nanoTime()
هو تقريبا عدد النانو ثانية منذ إعادة تشغيل النظام مرة أخرى (على الرغم من أن JVMs مختلفة قد تتصرف بشكل مختلف). هذا هو نفسه عبر jvms على نفس الجهاز ، ولكن يختلف بشكل كبير بين الآلات. الفرق المطلق عندما يتعلق الأمر بالآلات لا معنى له. ومع ذلك ، يمكن استخدام المعلومات للكشف عن القيم المتطرفة ؛ لا يمكنك تحديد أفضل زمن انتقال ، ولكن يمكنك تحديد المدى الذي تبعده عن أفضل الكمون الذي أنت عليه. هذا مفيد إذا كنت تركز على زمن التقدم المئوي 99. لدينا فصل يسمى RunningMinimum
للحصول على توقيت من آلات مختلفة ، مع تعويض الانجراف في nanoTime
بين الآلات. كلما اتخذت القياسات في كثير من الأحيان ، كلما كان هذا الحد الأدنى هو الحد الأدنى.
قائمة انتظار Chronicle تدير التخزين حسب الدورة. يمكنك إضافة StoreFileListener
التي ستعلمك عند إضافة ملف ، وعندما لا يتم الاحتفاظ به بعد الآن. يمكنك نقل أو ضغط أو حذف جميع الرسائل ليوم واحد ، مرة واحدة. ملاحظة: لسوء الحظ على Windows ، إذا تم مقاطعة عملية IO ، فيمكنها إغلاق Filechannel الأساسي.
لأسباب الأداء ، قمنا بإزالة التحقق من المقاطعات في رمز قائمة انتظار Chronicle. لهذا السبب ، نوصيك بتجنب استخدام قائمة انتظار Chronicle باستخدام رمز يولد المقاطعات. إذا لم تتمكن من تجنب توليد المقاطعات ، فنحن نقترح أن تقوم بإنشاء مثيل منفصل من قائمة انتظار Chronicle لكل موضوع.
غالبًا ما تستخدم قائمة انتظار Chronicle للأنظمة التي تركز على المنتج حيث تحتاج إلى الاحتفاظ بالكثير من البيانات لعدة أيام أو سنوات. للاطلاع على الإحصائيات ، انظر استخدام Chronicle-Queue
مهم | لا يدعم قائمة انتظار Chronicle تشغيل أي نظام ملفات شبكة ، سواء كان ذلك NFS أو AFS أو التخزين المستند إلى SAN أو أي شيء آخر. والسبب في ذلك هو أن أنظمة الملفات هذه لا توفر جميع البدائل المطلوبة للملفات التي تم تعيينها في الذاكرة التي تستخدمها قائمة انتظار Chronicle. إذا كانت هناك حاجة إلى أي شبكات (على سبيل المثال لجعل البيانات في متناول العديد من المضيفين) ، فإن الطريقة الوحيدة المدعومة هي نسخ قائمة انتظار Chronicle (ميزة المؤسسة). |
معظم أنظمة المراسلة تركز على المستهلك. يتم تنفيذ التحكم في التدفق لتجنب الحصول على تحميل المستهلك ؛ حتى لحظة. مثال شائع هو خادم يدعم متعددة مستخدمي واجهة المستخدم الرسومية. قد يكون هؤلاء المستخدمون على أجهزة مختلفة (نظام التشغيل والأجهزة) ، والصفات المختلفة للشبكة (الكمون وعرض النطاق الترددي) ، والقيام بمجموعة متنوعة من الأشياء الأخرى في أوقات مختلفة. لهذا السبب ، من المنطقي أن يخبر المستهلك العميل المنتج متى يتراجع ، وتأخير أي بيانات حتى يصبح المستهلك جاهزًا لاتخاذ المزيد من البيانات.
Queue Chronicle هو حل يركز على المنتج ويقوم بكل شيء ممكن لعدم الدفع أبدًا للمنتج ، أو أخبره بالتباطؤ. هذا يجعلها أداة قوية ، حيث توفر مخزن مؤقتًا كبيرًا بين نظامك ، ومنتج المنبع الذي لا يوجد لديك سيطرة عليه.
لا يمنحك ناشرو بيانات السوق خيار التراجع عن المنتج لفترة طويلة ؛ على الإطلاق. عدد قليل من مستخدمينا يستهلكون البيانات من CME Opra. هذا ينتج قممًا من 10 ملايين حدث في الدقيقة ، يتم إرسالها كحزم UDP دون أي إعادة محاولة. إذا فاتتك ، أو أسقطت حزمة ، فسيضيع. عليك أن تستهلك وتسجيل تلك الحزم بأسرع ما تأتي إليك ، مع القليل من التخزين المؤقت في محول الشبكة. بالنسبة لبيانات السوق على وجه الخصوص ، فإن الوقت الفعلي يعني في عدد قليل من microseconds ؛ هذا لا يعني داخل اليوم (خلال اليوم).
قائمة انتظار Chronicle سريعة وفعالة ، وقد تم استخدامها لزيادة السرعة التي يتم إقرار البيانات بين مؤشرات الترابط. بالإضافة إلى ذلك ، فإنه يحتفظ أيضًا بسجل لكل رسالة تم تمريرها مما يتيح لك تقليل كمية التسجيل التي تحتاج إلى القيام بها بشكل كبير.
مطلوب أنظمة الامتثال من خلال المزيد والمزيد من الأنظمة هذه الأيام. يجب على الجميع الحصول عليها ، ولكن لا أحد يريد أن يتباطأ من قبلهم. باستخدام قائمة انتظار Chronicle للتخزين المؤقت للبيانات بين الأنظمة المراقبة ونظام الامتثال ، لا داعي للقلق بشأن تأثير تسجيل الامتثال لأنظمة المراقبة الخاصة بك. مرة أخرى ، يمكن لـ Chronicle Queue دعم ملايين الأحداث لكل ثانية ، وبيانات كل خادم ، والوصول إلى البيانات التي تم الاحتفاظ بها لسنوات.
يدعم قائمة انتظار Chronicle IPC منخفضة الكمون (اتصال العمليات) بين JVMs على نفس الجهاز بترتيب حجم 1 microsecond ؛ وكذلك بين الآلات ذات الكمون النموذجي من 10 ميكروثانية لإنتاجية متواضعة بضع مئات من الآلاف. يدعم قائمة انتظار Chronicle إنتاجية ملايين الأحداث في الثانية ، مع زمنات زمنية دقيقة مستقرة.
انظر مقالات عن استخدام قائمة انتظار Chronicle في الخدمات الصغيرة
يمكن استخدام قائمة انتظار Chronicle لبناء آلات الحالة. يمكن إعادة إنتاج جميع المعلومات حول حالة هذه المكونات خارجيًا ، دون الوصول المباشر إلى المكونات ، أو إلى حالتها. هذا يقلل بشكل كبير من الحاجة إلى تسجيل إضافي. ومع ذلك ، يمكن تسجيل أي قطع تسجيل تحتاجها بتفصيل كبير. وهذا يجعل تمكين تسجيل DEBUG
في الإنتاج العملي. وذلك لأن تكلفة قطع الأشجار منخفضة للغاية ؛ أقل من 10 ميكروثانية. يمكن تكرار السجلات مركزيا لتوحيد السجل. يتم استخدام قائمة انتظار Chronicle لتخزين أكثر من 100 تيرابايت من البيانات ، والتي يمكن إعادة تشغيلها من أي نقطة في الوقت المناسب.
مكونات التدفق غير المدمجة هي أداء للغاية ، حتمية ، وقابلة للتكرار. يمكنك إعادة إنتاج الأخطاء التي تظهر فقط بعد مليون حدث تم لعبها بترتيب معين ، مع توقيت واقعية متسارعة. هذا يجعل استخدام معالجة الدفق جذابة للأنظمة التي تحتاج إلى درجة عالية من نتائج الجودة.
الإصدارات متوفرة على Maven Central على النحو التالي:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
راجع ملاحظات إصدار قائمة انتظار Chronicle واحصل على أحدث رقم إصدار. تتوفر لقطات على https://oss.sonatype.org
ملحوظة | الفئات الموجودة في أي من الحزم "الداخلية" و "الضمنا" و "الرئيسية" (الأخيرة التي تحتوي على طرق رئيسية مختلفة قابلة للتشغيل) وأي عبوات فرعية ليست جزءًا من واجهة برمجة التطبيقات العامة وقد تصبح عرضة للتغيير في أي الوقت لأي سبب . راجع ملفات package-info.java ذات الصلة info.java للحصول على التفاصيل. |
في Chronicle Queue V5 ، أصبحت مصمّفات V5 الآن قاتلة فقط ، في قائمة انتظار Chronicle V4 ، كان لدينا مفهوم الفهرسة البطيئة ، حيث لن يكتب الملاحقون فهارس ولكن بدلاً من ذلك يمكن أن يتم الفهرسة بواسطة الخياط. قررنا إسقاط الفهرسة كسول في V5 ؛ إن جعل المصيدين يقرؤون فقط يبسط قائمة انتظار Chronicle فحسب ، بل يتيح لنا أيضًا إضافة تحسينات في مكان آخر في الكود.
تم تغيير نموذج القفل لقائمة انتظار Chronicle في V5 ، في قائمة انتظار Chronicle v4 ، يوجد قفل الكتابة (لمنع كتابة المكتبات المتزامنة إلى قائمة الانتظار) في ملف .cq4. في V5 تم نقل هذا إلى ملف واحد يسمى متجر الجدول (Metadata.cq4t). هذا يبسط رمز القفل داخليًا حيث يجب فحص ملف متجر الجدول فقط.
يمكنك استخدام Chronicle Queue V5 لقراءة الرسائل المكتوبة باستخدام Chronicle Queue V4 ، ولكن هذا ليس مضمونًا للعمل دائمًا - إذا قمت ، على سبيل المثال ، بإنشاء قائمة انتظار V4 الخاصة بك باستخدام wireType(WireType.FIELDLESS_BINARY)
، فلن تتمكن Chronicle Queue V5 اقرأ رأس قائمة الانتظار. لدينا بعض الاختبارات لقوائم V5 Reading V4 ولكن هذه محدودة وقد لا يتم دعم جميع السيناريوهات.
لا يمكنك استخدام Queue Queue V5 للكتابة إلى قائمة انتظار Queue V4 Chronicle.
Chronicle Queue V4 عبارة عن إعادة كتابة كاملة من قائمة انتظار Chronicle التي تحل المشكلات التالية التي كانت موجودة في V3.
بدون الرسائل الموصوفة ذاتيًا ، كان على المستخدمين إنشاء وظائفهم الخاصة لإلقاء الرسائل وتخزين البيانات على المدى الطويل. مع V4 ليس عليك القيام بذلك ، ولكن يمكنك إذا كنت ترغب في ذلك.
من شأن قائمة انتظار الفانيليا كرونيكل إنشاء ملف لكل مؤشر ترابط. هذا أمر جيد إذا تم التحكم في عدد المواضيع ، ومع ذلك ، فإن العديد من التطبيقات لديها سيطرة ضئيلة أو معدومة على عدد المواضيع المستخدمة وهذا تسبب في مشاكل قابلية الاستخدام.
كان التكوين الخاص بفهرسة وفانيليا كرونيكل بالكامل في الكود ، لذلك كان على القارئ أن يكون له نفس التكوين مثل الكتاب ولم يكن واضحًا دائمًا ما كان عليه.
لم تكن هناك طريقة للمنتج لمعرفة مقدار البيانات التي تم تكرارها إلى الجهاز الثاني. كان الحل الوحيد هو تكرار البيانات مرة أخرى إلى المنتجين.
كنت بحاجة إلى تحديد حجم البيانات للحجز قبل أن تبدأ في كتابة رسالتك.
كنت بحاجة إلى قفل الخاص بك للملحق عند استخدام Chronicle المفهرسة.
في قائمة انتظار Chronicle V3 ، كان كل شيء من حيث البايت ، وليس الأسلاك. هناك طريقتان لاستخدام البايت في Queue Queue V4. يمكنك استخدام أساليب writeBytes
و readBytes
، أو يمكنك الحصول على bytes()
من السلك. على سبيل المثال:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle Queue Enterprise Edition هي نسخة مدعومة تجاريًا من قائمة انتظار Open Source Chronicle الناجحة. يتم تمديد الوثائق المفتوحة المصدر بواسطة المستندات التالية لوصف الميزات الإضافية المتوفرة عندما تكون مرخصًا لإصدار Enterprise. هذه هي:
تشفير قوائم انتظار الرسائل والرسائل. لمزيد من المعلومات ، راجع وثائق التشفير.
تكرار TCP/IP (واختياريا UDP) بين المضيفين لضمان النسخ الاحتياطي في الوقت الحقيقي لجميع بيانات قائمة الانتظار الخاصة بك. لمزيد من المعلومات ، راجع وثائق النسخ المتماثل ، يتم تغطية بروتوكول النسخ المتماثل في قائمة الانتظار في بروتوكول النسخ المتماثل.
دعم المنطقة الزمنية لجدولة التمديد في قائمة الانتظار اليومية. لمزيد من المعلومات ، انظر دعم المنطقة الزمنية.
دعم وضع ASYNC لإعطاء أداء محسّن عند الإنتاجية العالية على أنظمة الملفات الأبطأ. لمزيد من المعلومات ، راجع وضع Async وأداء الأداء أيضًا.
مسبق اللعنة لتحسين القيم المتطرفة ، انظر قبل اللعاد وتكوينه
بالإضافة إلى ذلك ، سيتم دعمك بالكامل من قبل خبرائنا الفنيين.
لمزيد من المعلومات حول إصدار Chronicle Queue Enterprise ، يرجى الاتصال بـ [email protected].
يتم تعريف قائمة انتظار Chronicle بواسطة SingleChronicleQueue.class
مصمم لدعم:
الملفات على أساس يومي أو أسبوعي أو كل ساعة ،
كتاب متزامن على نفس الجهاز ،
القراء المتزامنين على نفس الجهاز أو عبر آلات متعددة عبر تكرار TCP (مع مؤسسة قائمة انتظار Chronicle) ،
القراء والكتاب المتزامن بين Docker أو غيرها من أعباء العمل الحاوية
صفر نسخ التسلسل والخروج ،
ملايين الكتابة/القراءات في الثانية على أجهزة السلع.
ما يقرب من 5 ملايين رسالة/ثانية لرسائل 96 بايت على معالج i7-4790. هيكل دليل قائمة الانتظار هو كما يلي:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
يتكون التنسيق من بايت مُعدة بحجم يتم تنسيقه باستخدام BinaryWire
أو TextWire
. تم تصميم قائمة انتظار Chronicle ليتم طردها من الكود. يمكنك بسهولة إضافة واجهة تناسب احتياجاتك.
ملحوظة | نظرًا للتشغيل المنخفض المستوى إلى حد ما ، يمكن لعمليات قراءة/كتابة قائمة انتظار Chronicle إلقاء استثناءات غير محددة. من أجل منع وفاة الخيط ، قد يكون من العملي التقاط RuntimeExceptions وتسجيل/تحليلها حسب الاقتضاء. |
ملحوظة | للاطلاع على مظاهرات كيف يمكن استخدام قائمة انتظار Chronicle ، انظر Demo Chronicle Queue Demo و Documentation Java ، انظر Chronicle Queue Javadocs |
في الأقسام التالية ، نقدم أولاً بعض المصطلحات ومرجع سريع لاستخدام قائمة انتظار Chronicle. ثم ، نقدم دليلًا أكثر تفصيلاً.
Queue Chronicle عبارة عن مجلة مستمرة للرسائل التي تدعم الكتاب والقراء المتزامنين حتى عبر JVMs متعددة على نفس الجهاز. يرى كل قارئ كل رسالة ، ويمكن للقارئ الانضمام في أي وقت ولا يزال يرى كل رسالة.
ملحوظة | نتجنب عمدا مصطلح المستهلك ونستخدم القارئ بدلاً من ذلك لأن الرسائل لا يتم استهلاكها/تدميرها عن طريق القراءة. |
يحتوي قائمة انتظار Chronicle على المفاهيم الرئيسية التالية:
مقتطفات
مقتطفات هي حاوية البيانات الرئيسية في قائمة انتظار Chronicle. بمعنى آخر ، يتكون كل قائمة انتظار Chronicle من مقتطفات. تعني كتابة رسالة إلى قائمة انتظار Chronicle بدءًا من مقتطفات جديدة ، وكتابة رسالة فيها ، وإنهاء المقتطف في النهاية.
الجسد
الجيد هو مصدر الرسائل ؛ شيء مثل التكرار في بيئة كرونيكل. يمكنك إضافة البيانات إلحاق قائمة انتظار Chronicle الحالية. يمكنه إجراء عمليات كتابة متسلسلة عن طريق إلحاق نهاية قائمة الانتظار فقط. لا توجد وسيلة لإدراج المقتطفات أو حذفها.
خياط
الخياط هو قارئ مقتطف محسن للقراءات المتسلسلة. يمكن أن تنفيذ قراءات متسلسلة وعشوائية ، سواء إلى الأمام أو للخلف. يقرأ الخياطون الرسالة التالية المتاحة في كل مرة يتم استدعاؤها. يضمن ما يلي في قائمة انتظار Chronicle:
لكل جيدندر ، تتم كتابة الرسائل بالترتيب الذي كتبه الجسد لهم. رسائل من قبل الملاحق المختلفة متشابكة ،
لكل خيمة ، سترى كل رسالة لموضوع في نفس الترتيب مثل كل حرمان آخر ،
عند تكرارها ، كل نسخة طبق الأصل لديها نسخة من كل رسالة.
قائمة انتظار Chronicle هو الوسيط أقل. إذا كنت بحاجة إلى بنية مع وسيط ، فيرجى الاتصال بـ [email protected].
ملفات الملفات وقائمة الانتظار
تم تصميم قائمة انتظار Chronicle لتدحرج ملفاتها اعتمادًا على دورة لفة المختارة عند إنشاء قائمة الانتظار (انظر Rollcycles). بمعنى آخر ، يتم إنشاء ملف قائمة انتظار لكل دورة لفة تحتوي على امتداد cq4
. عندما تصل دورة Roll إلى النقطة التي يجب أن تتدحرج ، ستقوم Appender بكتابة علامة EOF
بشكل ذري في نهاية الملف الحالي للإشارة إلى أنه لا ينبغي أن يكتب أي جدح آخر إلى هذا الملف ولا ينبغي أن يقرأ أي حرمان ، وبدلاً من ذلك يجب على الجميع استخدام ملف جديد.
إذا تم إيقاف العملية ، وإعادة تشغيلها لاحقًا عندما تكون دورة اللولب تستخدم ملفًا جديدًا ، فسيحاول أحد الجداول تحديد موقع الملفات القديمة وكتابة علامة EOF
فيها للمساعدة في صياغة قراءتها.
موضوعات
كل موضوع هو دليل ملفات قائمة الانتظار. إذا كان لديك موضوع يسمى mytopic
، فقد يبدو التصميم هكذا:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
لنسخ جميع البيانات ليوم واحد (أو دورة) ، يمكنك نسخ الملف لهذا اليوم إلى جهاز التطوير الخاص بك لاختبار إعادة التشغيل.
قيود على الموضوعات والرسائل
تقتصر المواضيع على كونها سلاسل يمكن استخدامها كأسماء الدليل. ضمن موضوع ما ، يمكن أن يكون لديك طبقات فرعية يمكن أن تكون أي نوع بيانات يمكن تسلسلها. يمكن أن تكون الرسائل أي بيانات قابلة للتسلسل.
يدعم قائمة انتظار Chronicle:
الأشياء Serializable
، على الرغم من أن هذا يجب تجنبه لأنه غير فعال
يتم تفضيل الكائنات Externalizable
إذا كنت ترغب في استخدام واجهات برمجة تطبيقات Java القياسية.
byte[]
String
Marshallable
رسالة تصف نفسها والتي يمكن كتابتها باسم Yaml أو Binary Yaml أو JSON.
BytesMarshallable
وهو ترميز ثنائي المستوى ، أو ترميز النص.
يوفر هذا القسم مرجعًا سريعًا لاستخدام قائمة انتظار Chronicle لإظهار كيفية إنشاء ، الكتابة/القراءة في/من قائمة انتظار.
Chronicle Queue Construction
يختلف إنشاء مثيل لقائمة انتظار Chronicle عن مجرد استدعاء مُنشئ. لإنشاء مثيل ، يجب عليك استخدام ChronicleQueueBuilder
.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
في هذا المثال ، أنشأنا IndexedChronicle
يقوم بإنشاء اثنين RandomAccessFiles
. واحد للفهارس ، وواحدة للبيانات التي لها أسماء نسبيا:
${java.io.tmpdir}/getting-started/{today}.cq4
الكتابة إلى قائمة انتظار
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
القراءة من قائمة انتظار
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
أيضا ، يمكن استخدام طريقة ChronicleQueue.dump()
لتفريغ المحتويات الخام كسلسلة.
queue . dump ();
تنظيف
تقوم Chronicle Queue بتخزين بياناتها خارج المنزل ، ويوصى بالاتصال بـ close()
بمجرد الانتهاء من العمل مع Queue Chronicle ، لتحرير الموارد.
ملحوظة | لن تضيع أي بيانات إذا قمت بذلك. هذا هو فقط لتنظيف الموارد التي تم استخدامها. |
queue . close ();
وضع كل شيء معًا
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
يمكنك تكوين قائمة انتظار Chronicle باستخدام معلمات التكوين أو خصائص النظام. بالإضافة إلى ذلك ، هناك طرق مختلفة للكتابة/القراءة إلى/من قائمة انتظار مثل استخدام الوكلاء واستخدام MethodReader
و MethodWriter
.
يمكن تكوين قائمة انتظار Chronicle (CQ) عبر عدد من الطرق في فئة SingleChronicleQueueBuilder
. ويرد أدناه شرح عدد قليل من المعلمات التي تم الاستعلام عنها من قبل عملائنا.
دراجة الانقلاب
تقوم المعلمة RollCycle
بتكوين المعدل الذي ستقوم به CQ لفائف ملفات قائمة الانتظار الأساسية. على سبيل المثال ، سيؤدي استخدام مقتطف الرمز التالي إلى تراجع ملفات قائمة الانتظار (أي ملف جديد تم إنشاؤه) كل ساعة:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
بمجرد تعيين دورة لفة في قائمة الانتظار ، لا يمكن تغييرها في وقت لاحق. يجب تكوين أي مثيلات أخرى من SingleChronicleQueue
التي تم تكوينها لاستخدام المسار نفسه لاستخدام نفس دورة اللولب ، وإذا لم تكن كذلك ، فسيتم تحديث دورة اللولب لتتناسب مع دورة اللولب المستمرة. في هذه الحالة ، سيتم طباعة رسالة سجل تحذير من أجل إخطار مستخدم المكتبة بالموقف:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
إخراج وحدة التحكم:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
يعتمد الحد الأقصى لعدد الرسائل التي يمكن تخزينها في ملف قائمة الانتظار على دورة Roll. انظر الأسئلة الشائعة لمزيد من المعلومات حول هذا.
في قائمة انتظار Chronicle ، يعتمد وقت التمرير على UTC. تعمل ميزة Enterprise TimeZone على توسيع قدرة Chronicle Queue على تحديد وقت ودورة عمليات نقل قائمة الانتظار ، بدلاً من UTC. لمزيد من المعلومات ، راجع TimeZone Leenue Rollover.
توفر فئة قائمة انتظار Chronicle FileUtil
طرقًا مفيدة لإدارة ملفات قائمة الانتظار. انظر إدارة ملفات Roll مباشرة.
النموذج التنصري
من الممكن تكوين كيفية قيام قائمة انتظار Chronicle بتخزين البيانات عن طريق ضبط WireType
بشكل صريح:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
على سبيل المثال:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
على الرغم من أنه من الممكن توفير النموذج التنصري بشكل صريح عند إنشاء منشئ ، إلا أنه لا يتم تثبيته حيث لا يتم دعم جميع أنواع الأسلاك بواسطة قائمة انتظار Chronicle حتى الآن. على وجه الخصوص ، لا يتم دعم أنواع الأسلاك التالية:
نص (وكل ذلك يعتمد بشكل أساسي على النص ، بما في ذلك JSON و CSV)
خام
read_any
الكتل
عندما تتم قراءة/كتابة قائمة انتظار ، يتم تعيين جزء من الملف الذي يتم قراءته/المكتوبة حاليًا إلى مقطع ذاكرة. تتحكم هذه المعلمة في حجم كتلة تعيين الذاكرة. يمكنك تغيير هذه المعلمة باستخدام الطريقة SingleChronicleQueueBuilder.blockSize(long blockSize)
إذا كانت ضرورية.
ملحوظة | يجب عليك تجنب تغيير blockSize دون داع. |
إذا كنت ترسل رسائل كبيرة ، فيجب عليك تعيين blockSize
كبيرة ، أي يجب أن يكون blockSize
أربعة أضعاف حجم الرسالة على الأقل.
تحذير | إذا كنت تستخدم blockSize صغيرة للرسائل الكبيرة ، فستتلقى إحباطًا IllegalStateException ويتم إحباط الكتابة. |
نوصيك باستخدام نفس blockSize
لكل مثيل قائمة انتظار عند تكرار قوائم الانتظار ، لا تتم كتابة blockSize
إلى بيانات قائمة الانتظار ، لذلك يجب ضبطها على نفس القيمة عند إنشاء مثيلات قائمة انتظار Chronicle (يوصى بذلك ولكن إذا كنت ترغب في ذلك لتشغيل مع blocksize
مختلفة يمكنك).
نصيحة | استخدم نفس blockSize لكل مثيل من قوائم الانتظار المتكررة. |
indexspacing
توضح هذه المعلمة المسافة بين مقتطفات مفهرسة بشكل صريح. العدد الأعلى يعني أداء كتابة متسلسل أعلى ولكن قراءة الوصول العشوائي أبطأ. لا يتأثر أداء القراءة المتسلسل بهذه الخاصية. على سبيل المثال ، يمكن إرجاع تباعد الفهرس الافتراضي التالي:
16 (بدقة)
64 (يوميا)
يمكنك تغيير هذه المعلمة باستخدام طريقة SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
.
IndexCount
حجم كل صفيف فهرس ، وكذلك العدد الإجمالي لمصفوفات الفهرس لكل ملف قائمة انتظار.
ملحوظة | IndexCount 2 هو الحد الأقصى لعدد إدخالات قائمة الانتظار المفهرسة. |
ملحوظة | انظر القسم المقتطف الفهرسة في قائمة انتظار Chronicle من دليل المستخدم هذا لمزيد من المعلومات وأمثلة من استخدام الفهارس. |
readbuffermode ، trintbuffermode
تحدد هذه المعلمات buffermode للقراءات أو الكتابة التي لها الخيارات التالية:
None
- الافتراضي (والوحيد الوحيد المتاح للمستخدمين مفتوح المصدر) ، لا يوجد تخزين مؤقت ؛
Copy
- تستخدم بالاقتران مع التشفير ؛
Asynchronous
- استخدم عازلة غير متزامنة عند القراءة و/أو الكتابة ، والتي يوفرها وضع Chronicle Async.
التبعية
سعة Ringbuffer في البايتات عند استخدام bufferMode: Asynchronous
في قائمة انتظار Chronicle ، نشير إلى فعل كتابة بياناتك إلى قائمة انتظار Chronicle ، كخزانة مقتطفات. يمكن تعويض هذه البيانات من أي نوع بيانات ، بما في ذلك النص أو الأرقام أو النقط التسلسلية. في النهاية ، يتم تخزين جميع بياناتك ، بغض النظر عن ما هي عليه ، كسلسلة من البايتات.
قبل تخزين مقتطفاتك مباشرة ، تحتفظ Chronicle Queue برأس 4 بايت. يكتب Queue Chronicle طول بياناتك في هذا الرأس. وبهذه الطريقة ، عندما يأتي Chronicle Queue لقراءة مقتطفات ، فإنه يعرف إلى متى تستغرق كل نقطة من البيانات. نشير إلى هذا الرأس 4 بايت ، جنبا إلى جنب مع مقتطفاتك ، كمستند. يمكن استخدام قائمة انتظار Chronicle بصراحة لقراءة وكتابة المستندات.
ملحوظة | ضمن هذا الرأس المكون من 4 بايت ، نحتفظ أيضًا ببعض البتات لعدد من العمليات الداخلية ، مثل القفل ، لجعل مؤشر ترابط قائمة انتظار Chronicle عبر كل من المعالجات والموضوعات. الشيء المهم الذي يجب ملاحظته هو أنه بسبب هذا ، لا يمكنك تحويل البايتات الأربعة بشكل صارم إلى عدد صحيح للعثور على طول كتلة البيانات الخاصة بك. |
كما ذُكر من قبل ، يستخدم Chronicle Queue ملحقًا للكتابة إلى قائمة الانتظار والخياط للقراءة من قائمة الانتظار. على عكس حلول طابور Java الأخرى ، لا تضيع الرسائل عند قراءتها مع حرمان. يتم تغطية هذا بمزيد من التفاصيل في القسم أدناه حول "القراءة من قائمة انتظار باستخدام خياط". لكتابة البيانات إلى قائمة انتظار Chronicle ، يجب عليك أولاً إنشاء أحد الجداول:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
يستخدم قائمة انتظار Chronicle الواجهة المنخفضة المستوى التالية لكتابة البيانات:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
الإغلاق في المحاولة مع الموارد ، هو النقطة التي يتم فيها كتابة طول البيانات إلى الرأس. يمكنك أيضًا استخدام DocumentContext
لمعرفة الفهرس الذي تم تعيين بياناتك للتو (انظر أدناه). يمكنك لاحقًا استخدام هذا الفهرس للانتقال إلى/البحث عن هذا المقتطف. كل مقتطفات قائمة انتظار Chronicle لديها فهرس فريد.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
الأساليب عالية المستوى أدناه مثل writeText()
هي أساليب راحة في الاتصال على appender.writingDocument()
، ولكن كلا النهجين يفعلان الشيء نفسه بشكل أساسي. يبدو أن الكود الفعلي لـ writeText(CharSequence text)
هكذا:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
لذلك لديك خيار لعدد من الواجهات عالية المستوى ، وصولاً إلى واجهة برمجة تطبيقات منخفضة المستوى ، إلى الذاكرة الخام.
هذا هو أعلى مستوى من واجهة برمجة التطبيقات التي تخفي حقيقة أنك تكتب للرسائل على الإطلاق. الفائدة هي أنه يمكنك تبديل المكالمات إلى الواجهة بمكون حقيقي ، أو واجهة لبروتوكول مختلف.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
يمكنك كتابة "رسالة وصف ذاتي". يمكن أن تدعم هذه الرسائل تغييرات المخطط. كما أنها أسهل في الفهم عند تصحيح الأخطاء أو تشخيص المشكلات.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
يمكنك كتابة "البيانات الخام" التي تصف ذاتيا. ستكون الأنواع صحيحة دائمًا ؛ الموقف هو المؤشر الوحيد على معنى تلك القيم.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
يمكنك كتابة "البيانات الخام" التي لا تصف ذاتيا. يجب أن يعرف القارئ معنى هذه البيانات ، والأنواع التي تم استخدامها.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
أدناه ، يتم توضيح أدنى مستوى لكتابة البيانات. يمكنك الحصول على عنوان للذاكرة الخام ويمكنك كتابة ما تريد.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
يمكنك طباعة محتويات قائمة الانتظار. يمكنك رؤية أول رسالتين ، وتخزين رسالتين آخران نفس البيانات.
// dump the content of the queue
System . out . println ( queue . dump ());
مطبوعات:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
تتبع قراءة قائمة الانتظار نفس نمط الكتابة ، إلا أن هناك احتمال عدم وجود رسالة عند محاولة قراءتها.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
يمكنك تحويل كل رسالة إلى استدعاء طريقة بناءً على محتوى الرسالة ، وأن يكون لديك قائمة انتظار Chronicle تلقائيًا تخلص من وسيطات الطريقة. سيتخلى الاتصال على reader.readOne()
تلقائيًا (تصفية) أي رسائل لا تتطابق مع قارئ الطريقة.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
يمكنك فك تشفير الرسالة بنفسك.
ملحوظة | لا يجب أن تتطابق الأسماء والنوع وترتيب الحقول. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
يمكنك قراءة قيم البيانات الموصوفة الذاتي. سيؤدي ذلك إلى التحقق من أن الأنواع صحيحة ، وتحويلها كما هو مطلوب.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
يمكنك قراءة البيانات الأولية على أنها بدائية وسلاسل.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
أو يمكنك الحصول على عنوان الذاكرة الأساسي والوصول إلى الذاكرة الأصلية.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
ملحوظة | يرى كل حرمان كل رسالة. |
يمكن إضافة التجريد إلى رسائل تصفية ، أو تعيين رسائل إلى معالج رسالة واحد فقط. ومع ذلك ، بشكل عام ، لا تحتاج إلا إلى حرمان رئيسي واحد لموضوع ما ، مع ربما ، بعض المصيدين الداعمين للمراقبة وما إلى ذلك.
نظرًا لأن قائمة انتظار Chronicle لا تقسم موضوعاته ، فستحصل على طلب تام لجميع الرسائل ضمن هذا الموضوع. عبر الموضوعات ، لا يوجد ضمان للطلب ؛ إذا كنت ترغب في إعادة التشغيل بشكل حتمي من نظام يستهلك من موضوعات متعددة ، فإننا نقترح إعادة التشغيل من إخراج هذا النظام.
قد تقوم شركة Chronicle Leaue Tailers بإنشاء معالجات ملفات ، يتم تنظيف معالجات الملفات كلما تم استدعاء طريقة قائمة قائمة انتظار Chronicle close()
أو كلما قام JVM بتشغيل مجموعة القمامة. إذا كنت تكتب الكود الخاص بك ، فلا تحتوي على توقف مؤقت GC وترغب صراحة في تنظيف معالجات الملفات ، يمكنك الاتصال بما يلي:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
في بعض التطبيقات ، قد يكون من الضروري البدء في القراءة من نهاية قائمة الانتظار (على سبيل المثال في سيناريو إعادة التشغيل). بالنسبة لحالة الاستخدام هذه ، يوفر ExcerptTailer
طريقة toEnd()
. عندما يكون اتجاه الخياط إلى FORWARD
(بشكل افتراضي toEnd()
أو كما تم تعيينه بواسطة طريقة ExcerptTailer.direction
. في هذه الحالة ، أصبح الخياط جاهزًا الآن لقراءة أي سجلات جديدة تم إلحاقها بقائمة الانتظار. حتى يتم إلحاق أي رسائل جديدة بقائمة الانتظار ، فلن يكون هناك DocumentContext
جديدة متاحة للقراءة:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
إذا كان من الضروري القراءة للخلف من خلال قائمة الانتظار من النهاية ، فيمكن ضبط الخياط للقراءة للخلف:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
عند القراءة للخلف ، ستنقل طريقة toEnd()
الخياط إلى السجل الأخير في قائمة الانتظار. إذا لم تكن قائمة الانتظار فارغة ، فسيكون هناك DocumentContext
متاحة للقراءة:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
الملقب المسمى الخياط.
قد يكون من المفيد أن يكون لديك خياط يستمر من حيث كان الأمر كذلك عند إعادة تشغيل التطبيق.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
حرم "أ" آخر رسالة يقرأ 2
حرم "A" القراءة التالية الرسالة 3
يقرأ آخر "ب" آخر رسالة 0
يرفع "ب" القراءة التالية الرسالة 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
ملحوظة | The direction() is not preserved across restarts, only the next index to be read. |
ملحوظة | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
نصيحة | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
ملحوظة | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; اسم السلسلة الخاصة ؛ }
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
أو
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
ملحوظة | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
تحذير | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
ملحوظة | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
ملحوظة | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
ملحوظة | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
نتائج:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 ------------------------------------------------- ------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 ------------------------------------------------- ------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 ------------------------------------------------- ------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 ------------------------------------------------- ------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 ------------------------------------------------- ------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 ------------------------------------------------- ------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 ------------------------------------------------- ------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 ------------------------------------------------- ------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
نتائج:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | إنتاجية |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.