https://player.vimeo.com/video/201989439
Chronicle Queue เป็นเฟรมเวิร์กการส่งข้อความที่มีความล่าช้าต่ำอย่างต่อเนื่องสำหรับการใช้งานที่มีประสิทธิภาพสูง
โครงการนี้ครอบคลุมคิวพงศาวดารรุ่น Java รุ่น C ++ ของโครงการนี้มีให้บริการและรองรับการทำงานร่วมกันของ Java/C ++ รวมถึงการผูกภาษาเพิ่มเติมเช่น Python หากคุณสนใจประเมินเวอร์ชัน C ++ โปรดติดต่อ [email protected]
เมื่อมองแวบแรกคิวพงศาวดารสามารถมองเห็นได้ว่าเป็นเพียง การใช้คิวอีกครั้ง อย่างไรก็ตามมันมีตัวเลือกการออกแบบที่สำคัญที่ควรเน้น การใช้ ที่เก็บข้อมูลนอกกอง คิวพงศาวดารให้สภาพแวดล้อมที่แอปพลิเคชันไม่ได้รับผลกระทบจากการรวบรวมขยะ (GC) เมื่อใช้แอพพลิเคชั่นที่มีประสิทธิภาพสูงและใช้หน่วยความจำ (คุณได้ยินคำว่า "BigData"?) ใน Java หนึ่งในปัญหาที่ใหญ่ที่สุดคือการรวบรวมขยะ
คิวพงศาวดารอนุญาตให้เพิ่มข้อความในตอนท้ายของคิว ("ผนวก") อ่านจากคิว ("หาง") และยังสนับสนุนการค้นหาแบบสุ่ม
คุณสามารถพิจารณาคิวพงศาวดารที่คล้ายกับหัวข้อที่ทนทาน/ยังคงอยู่ในเวลาแฝงน้อยซึ่งสามารถมีข้อความประเภทและขนาดที่แตกต่างกันได้ คิวพงศาวดารเป็นคิวที่ไม่มีขอบเขตที่กระจายอยู่ที่:
รองรับ RMI แบบอะซิงโครนัสและเผยแพร่/สมัครสมาชิกอินเตอร์เฟสด้วยเวลาแฝง microsecond
ส่งข้อความระหว่าง JVMs ภายใต้ไมโครวินาที
ส่งข้อความระหว่าง JVMs บนเครื่องจักรที่แตกต่างกันผ่านการจำลองแบบในระดับต่ำกว่า 10 ไมโครวินาที (คุณสมบัติองค์กร)
ให้เวลาแฝงแบบเรียลไทม์ที่มีความเสถียรและนุ่มนวลลงในข้อความนับล้านต่อวินาทีสำหรับเธรดเดียวไปยังหนึ่งคิว ด้วยการสั่งซื้อทั้งหมดของทุกเหตุการณ์
เมื่อเผยแพร่ข้อความ 40 ไบต์เปอร์เซ็นต์ที่สูงของเวลาที่เราบรรลุเวลาแฝงภายใต้ 1 ไมโครวินาที เวลาแฝงเปอร์เซ็นไทล์ 99 นั้นเป็น 1 ใน 1 ใน 100 และเปอร์เซ็นไทล์ 99.9 นั้นเป็น 1 ใน 1,000 แฝง
ขนาดแบทช์ | 10 ล้านเหตุการณ์ต่อนาที | 60 ล้านเหตุการณ์ต่อนาที | 100 ล้านเหตุการณ์ต่อนาที |
---|---|---|---|
99%ile | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%ile | 1.2 µs | 1.3 µs | 1.5 µs |
ขนาดแบทช์ | 10 ล้านเหตุการณ์ต่อนาที | 60 ล้านเหตุการณ์ต่อนาที | 100 ล้านเหตุการณ์ต่อนาที |
---|---|---|---|
99%ile | 20 µs | 28 µs | 176 µs |
99.9%ile | 901 µs | 705 µs | 5,370 µs |
บันทึก | เหตุการณ์ 100 ล้านครั้งต่อนาทีคือการส่งเหตุการณ์ทุก 660 นาโนวินาที ทำซ้ำและคงอยู่ |
สำคัญ | ประสิทธิภาพนี้ไม่สามารถทำได้โดยใช้ เครื่องจักรขนาดใหญ่ นี่คือการใช้หนึ่งเธรดเพื่อเผยแพร่และหนึ่งเธรดเพื่อบริโภค |
คิวพงศาวดารถูกออกแบบมาเพื่อ:
เป็น "บันทึกทุกอย่างร้านค้า" ซึ่งสามารถอ่านได้ด้วยเวลาแฝงแบบเรียลไทม์ไมโครวินาที สิ่งนี้รองรับแม้กระทั่งระบบการซื้อขายความถี่สูงที่ต้องการมากที่สุด อย่างไรก็ตามสามารถใช้ในแอปพลิเคชันใด ๆ ที่การบันทึกข้อมูลเป็นข้อกังวล
สนับสนุนการจำลองแบบที่เชื่อถือได้พร้อมการแจ้งเตือนถึง Appender (ผู้เขียนข้อความ) หรือตัวตัดแต่ง (ผู้อ่านข้อความ) เมื่อข้อความได้รับการจำลองแบบสำเร็จแล้ว
คิวพงศาวดารถือว่าพื้นที่ดิสก์ราคาถูกเมื่อเทียบกับหน่วยความจำ คิวพงศาวดารใช้พื้นที่ดิสก์ที่คุณมีอย่างเต็มที่และดังนั้นคุณจึงไม่ได้ถูก จำกัด ด้วยหน่วยความจำหลักของเครื่อง หากคุณใช้การปั่น HDD คุณสามารถเก็บพื้นที่ดิสก์จำนวนมากของ TBS ในราคาเพียงเล็กน้อย
ซอฟต์แวร์พิเศษเพียงอย่างเดียวที่คิวพงศาวดารต้องเรียกใช้คือระบบปฏิบัติการ มันไม่มีนายหน้า แต่ใช้ระบบปฏิบัติการของคุณเพื่อทำงานทั้งหมด หากแอปพลิเคชันของคุณตายระบบปฏิบัติการจะทำงานต่อไปอีกนานดังนั้นจึงไม่มีข้อมูลที่หายไป แม้จะไม่มีการจำลองแบบ
ในขณะที่ Chronicle Queue เก็บข้อมูลทั้งหมดที่บันทึกไว้ในไฟล์ที่แมปหน่วยความจำสิ่งนี้มีค่าใช้จ่ายบนกองอยู่เล็กน้อยแม้ว่าคุณจะมีข้อมูลมากกว่า 100 TB ก็ตาม
Chronicle ใช้ความพยายามอย่างมีนัยสำคัญในการบรรลุความล่าช้าที่ต่ำมาก ในผลิตภัณฑ์อื่น ๆ ที่มุ่งเน้นไปที่การสนับสนุนเว็บแอปพลิเคชันเวลาแฝงน้อยกว่า 40 มิลลิวินาทีนั้นดีกว่าที่คุณเห็น ตัวอย่างเช่นอัตราเฟรมของโรงภาพยนตร์คือ 24 Hz หรือประมาณ 40 มิลลิวินาที
คิวพงศาวดารมีจุดมุ่งหมายเพื่อให้ได้ความหน่วงแฝงต่ำกว่า 40 ไมโครวินาทีสำหรับ 99% ถึง 99.99% ของเวลา การใช้คิวพงศาวดารโดยไม่มีการจำลองแบบเราสนับสนุนแอพพลิเคชั่นที่มีเวลาแฝงต่ำกว่า 40 ไมโครวินาทีต่อไปในหลายบริการ บ่อยครั้งที่ความล่าช้า 99% ของคิวพงศาวดารขึ้นอยู่กับทางเลือกของระบบปฏิบัติการและระบบย่อยฮาร์ดดิสก์
การจำลองแบบสำหรับคิวพงศาวดารรองรับพงศาวดาร สิ่งนี้รองรับการบีบอัดแบบเรียลไทม์ซึ่งคำนวณเดลต้าสำหรับวัตถุแต่ละชิ้นตามที่เขียนไว้ สิ่งนี้สามารถลดขนาดของข้อความได้ด้วยปัจจัย 10 หรือดีกว่าโดยไม่จำเป็นต้องแบทช์ นั่นคือโดยไม่ต้องแนะนำเวลาแฝงที่สำคัญ
คิวพงศาวดารยังรองรับการบีบอัด LZW, Snappy และ GZIP อย่างไรก็ตามรูปแบบเหล่านี้เพิ่มเวลาแฝงที่สำคัญ สิ่งเหล่านี้มีประโยชน์เฉพาะในกรณีที่คุณมีข้อ จำกัด ที่เข้มงวดเกี่ยวกับแบนด์วิดท์เครือข่าย
คิวพงศาวดารรองรับความหมายจำนวนมาก:
ทุกข้อความจะถูกเล่นซ้ำเมื่อรีสตาร์ท
มีเพียงข้อความใหม่เท่านั้นที่เล่นในการรีสตาร์ท
รีสตาร์ทจากจุดใด ๆ ที่รู้จักโดยใช้ดัชนีของรายการ
เล่นซ้ำเฉพาะข้อความที่คุณพลาด สิ่งนี้ได้รับการสนับสนุนโดยตรงโดยใช้ MethodReader/MethodWriter Builders
ในระบบระบบส่วนใหญ่ System.nanoTime()
มีจำนวนนาโนวินาทีประมาณตั้งแต่ระบบรีบูตครั้งล่าสุด (แม้ว่า JVM ที่แตกต่างกันอาจทำงานแตกต่างกัน) นี่เป็นสิ่งเดียวกันใน JVMs บนเครื่องเดียวกัน แต่แตกต่างกันอย่างดุเดือดระหว่างเครื่องจักร ความแตกต่างที่แน่นอนเมื่อพูดถึงเครื่องจักรนั้นไม่มีความหมาย อย่างไรก็ตามข้อมูลสามารถใช้ในการตรวจจับค่าผิดปกติ คุณไม่สามารถระบุได้ว่าเวลาแฝงที่ดีที่สุดคืออะไร แต่คุณสามารถระบุได้ว่าคุณเป็นเวลาแฝงที่ดีที่สุดเท่าไหร่ สิ่งนี้มีประโยชน์หากคุณมุ่งเน้นไปที่เวลาแฝงเปอร์เซ็นไทล์ที่ 99 เรามีคลาสที่เรียกว่า RunningMinimum
เพื่อรับการกำหนดเวลาจากเครื่องจักรที่แตกต่างกันในขณะที่ชดเชยการดริฟท์ใน nanoTime
ระหว่างเครื่อง ยิ่งคุณทำการวัดบ่อยเท่าไหร่ก็ยิ่งมีความแม่นยำน้อยที่สุด
คิวพงศาวดารจัดการการจัดเก็บโดยรอบ คุณสามารถเพิ่ม StoreFileListener
ซึ่งจะแจ้งให้คุณทราบเมื่อมีการเพิ่มไฟล์และเมื่อไม่เก็บรักษาไว้อีกต่อไป คุณสามารถย้ายบีบอัดหรือลบข้อความทั้งหมดเป็นเวลาหนึ่งวันในครั้งเดียว หมายเหตุ: น่าเสียดายที่ Windows หากการดำเนินการ IO ถูกขัดจังหวะมันสามารถปิด filechannel พื้นฐานได้
เนื่องจากเหตุผลด้านประสิทธิภาพเราได้ลบการตรวจสอบการขัดจังหวะในรหัสคิวพงศาวดาร ด้วยเหตุนี้เราขอแนะนำให้คุณหลีกเลี่ยงการใช้คิวพงศาวดารด้วยรหัสที่สร้างการขัดจังหวะ หากคุณไม่สามารถหลีกเลี่ยงการขัดจังหวะการขัดจังหวะเราขอแนะนำให้คุณสร้างอินสแตนซ์แยกต่างหากของคิวพงศาวดารต่อเธรด
คิวพงศาวดารมักใช้สำหรับระบบผู้ผลิตเป็นศูนย์กลางซึ่งคุณต้องเก็บข้อมูลจำนวนมากไว้หลายวันหรือหลายปี สำหรับสถิติโปรดดูการใช้ Chronicle-Queue
สำคัญ | Chronicle Queue ไม่ รองรับการใช้งานระบบไฟล์เครือข่ายใด ๆ ไม่ว่าจะเป็น NFS, AFS, ที่เก็บข้อมูลที่ใช้ SAN หรือสิ่งอื่นใด เหตุผลของสิ่งนี้คือระบบไฟล์เหล่านั้นไม่ได้ให้บริการดั้งเดิมที่จำเป็นทั้งหมดสำหรับการใช้คิวพงศาวดารไฟล์หน่วยความจำ หากจำเป็นต้องมีเครือข่ายใด ๆ (เช่นเพื่อให้ข้อมูลสามารถเข้าถึงได้สำหรับโฮสต์หลาย ๆ ตัว) วิธีที่รองรับเพียงอย่างเดียวคือการจำลองแบบคิวพงศาวดาร (คุณสมบัติองค์กร) |
ระบบการส่งข้อความส่วนใหญ่เป็นผู้บริโภคเป็นศูนย์กลาง การควบคุมการไหลถูกนำมาใช้เพื่อหลีกเลี่ยงผู้บริโภคที่เคยได้รับมากเกินไป แม้กระทั่งชั่วขณะ ตัวอย่างทั่วไปคือเซิร์ฟเวอร์ที่รองรับผู้ใช้ GUI หลายราย ผู้ใช้เหล่านั้นอาจอยู่ในเครื่องจักรที่แตกต่างกัน (ระบบปฏิบัติการและฮาร์ดแวร์) คุณภาพที่แตกต่างกันของเครือข่าย (เวลาแฝงและแบนด์วิดท์) ทำสิ่งอื่น ๆ อีกมากมายในเวลาที่ต่างกัน ด้วยเหตุนี้จึงสมเหตุสมผลสำหรับผู้บริโภคลูกค้าที่จะบอกผู้ผลิตเมื่อต้องถอยกลับล่าช้าข้อมูลใด ๆ จนกว่าผู้บริโภคจะพร้อมที่จะรับข้อมูลมากขึ้น
Chronicle Queue เป็นโซลูชันผู้ผลิตเป็นศูนย์กลางและทำทุกอย่างที่เป็นไปได้ที่จะไม่ผลักดันให้ผู้ผลิตกลับมาหรือบอกให้ช้าลง สิ่งนี้ทำให้เป็นเครื่องมือที่ทรงพลังโดยให้บัฟเฟอร์ขนาดใหญ่ระหว่างระบบของคุณและผู้ผลิตต้นน้ำที่คุณมีการควบคุมเพียงเล็กน้อยหรือไม่
ผู้เผยแพร่ข้อมูลตลาดไม่ได้ให้ตัวเลือกในการผลักดันให้ผู้ผลิตกลับมาเป็นเวลานาน ถ้าทั้งหมด. ผู้ใช้ของเราบางคนบริโภคข้อมูลจาก CME OPRA สิ่งนี้สร้างยอดเขา 10 ล้านเหตุการณ์ต่อนาทีส่งเป็นแพ็คเก็ต UDP โดยไม่ต้องลองใหม่ หากคุณพลาดหรือวางแพ็คเก็ตก็จะหายไป คุณต้องบริโภคและบันทึกแพ็คเก็ตเหล่านั้นให้เร็วที่สุดเท่าที่พวกเขามาหาคุณด้วยการบัฟเฟอร์น้อยมากในอะแดปเตอร์เครือข่าย สำหรับข้อมูลการตลาดโดยเฉพาะอย่างยิ่งเวลาจริงหมายถึงใน ไม่กี่ไมโครวินาที มันไม่ได้หมายถึงภายในวัน (ระหว่างวัน)
คิวพงศาวดารนั้นรวดเร็วและมีประสิทธิภาพและถูกนำมาใช้เพื่อเพิ่มความเร็วที่ข้อมูลถูกส่งผ่านระหว่างเธรด นอกจากนี้ยังเก็บบันทึกทุกข้อความที่ผ่านมาช่วยให้คุณลดปริมาณการบันทึกที่คุณต้องทำ
ระบบการปฏิบัติตามกฎระเบียบเป็นสิ่งจำเป็นโดยระบบมากขึ้นเรื่อย ๆ ในทุกวันนี้ ทุกคนต้องมีพวกเขา แต่ไม่มีใครอยากจะชะลอตัวลง ด้วยการใช้คิวพงศาวดารเพื่อบัฟเฟอร์ข้อมูลระหว่างระบบที่ถูกตรวจสอบและระบบการปฏิบัติตามกฎระเบียบคุณไม่จำเป็นต้องกังวลเกี่ยวกับผลกระทบของการบันทึกการปฏิบัติตามกฎระเบียบสำหรับระบบที่ตรวจสอบของคุณ อีกครั้งคิวพงศาวดารสามารถรองรับเหตุการณ์หลายล้านรายการต่อวินาทีต่อเซิร์ฟเวอร์และข้อมูลการเข้าถึงที่เก็บรักษาไว้เป็นเวลาหลายปี
พงศาวดารคิวรองรับ IPC ที่แฝงอยู่ต่ำ (การสื่อสารระหว่างกระบวนการ) ระหว่าง JVMs บนเครื่องเดียวกันตามลำดับขนาด 1 ไมโครวินาที; เช่นเดียวกับระหว่างเครื่องที่มีเวลาแฝงทั่วไป 10 ไมโครวินาทีสำหรับปริมาณงานเล็กน้อยไม่กี่แสน คิวพงศาวดารรองรับปริมาณงานหลายล้านเหตุการณ์ต่อวินาทีด้วยเวลาแฝงไมโครวินาทีที่เสถียร
ดูบทความเกี่ยวกับการใช้คิวพงศาวดารใน microservices
คิวพงศาวดารสามารถใช้ในการสร้างเครื่องจักรของรัฐ ข้อมูลทั้งหมดเกี่ยวกับสถานะของส่วนประกอบเหล่านั้นสามารถทำซ้ำภายนอกได้โดยไม่ต้องเข้าถึงส่วนประกอบโดยตรงหรือไปยังสถานะของพวกเขา สิ่งนี้จะช่วยลดความจำเป็นในการบันทึกเพิ่มเติม อย่างไรก็ตามการบันทึกใด ๆ ที่คุณต้องการสามารถบันทึกได้อย่างละเอียด สิ่งนี้ทำให้การเปิดใช้งานการล็อกอิน DEBUG
ในการผลิตในทางปฏิบัติ นี่เป็นเพราะค่าใช้จ่ายในการตัดไม้ต่ำมาก น้อยกว่า 10 ไมโครวินาที บันทึกสามารถทำซ้ำส่วนกลางสำหรับการรวมบันทึก คิวพงศาวดารถูกใช้เพื่อจัดเก็บข้อมูลมากกว่า 100 TB ซึ่งสามารถเล่นซ้ำได้จากทุกจุดในเวลา
ส่วนประกอบสตรีมมิ่งที่ไม่ติดตั้งนั้นมีประสิทธิภาพสูงกำหนดและทำซ้ำได้ คุณสามารถทำซ้ำแมลงซึ่งปรากฏขึ้นหลังจากมีการเล่นเป็นล้านเหตุการณ์ในลำดับที่เฉพาะเจาะจงพร้อมกับการกำหนดเวลาที่สมจริง สิ่งนี้ทำให้การใช้การประมวลผลสตรีมน่าสนใจสำหรับระบบที่ต้องการผลลัพธ์ที่มีคุณภาพในระดับสูง
มีการเปิดตัวใน Maven Central As:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >
ดูบันทึกย่อการเปิดตัวคิวพงศาวดารและรับหมายเลขเวอร์ชันล่าสุด สแน็ปช็อตมีอยู่ใน https://oss.sonatype.org
บันทึก | คลาสที่อาศัยอยู่ในแพ็คเกจใด ๆ 'ภายใน', 'Impl' และ 'Main' (หลังที่มีวิธีการหลักที่รันได้หลากหลาย) และแพคเกจย่อยใด ๆ ไม่ได้เป็นส่วนหนึ่งของ API สาธารณะและ อาจมีการเปลี่ยนแปลงใด ๆ เวลาไม่ว่าด้วยเหตุผลใดก็ตาม ดูไฟล์ package-info.java ที่เกี่ยวข้องสำหรับรายละเอียด |
ในพงศาวดารคิว V5 tailers ตอนนี้อ่านอย่างเดียวในพงศาวดารคิว V4 เรามีแนวคิดของการจัดทำดัชนีขี้เกียจโดยที่ไส้ติ่งจะไม่เขียนดัชนี แต่แทนที่จะทำดัชนีโดยผู้ปรับ เราตัดสินใจที่จะวางดัชนีขี้เกียจใน V5; การทำส่วนเสริมอ่านอย่างเดียวไม่เพียง แต่ทำให้คิวพงศาวดารง่ายขึ้น แต่ยังช่วยให้เราสามารถเพิ่มการปรับให้เหมาะสมในที่อื่นในรหัส
รูปแบบการล็อคของคิวพงศาวดารมีการเปลี่ยนแปลงใน V5 ในคิวพงศาวดาร V4 ล็อคการเขียน (เพื่อป้องกันการเขียนพร้อมกันไปยังคิว) มีอยู่ในไฟล์. cq4 ใน V5 สิ่งนี้ถูกย้ายไปยังไฟล์เดียวที่เรียกว่า Table Store (metadata.cq4t) สิ่งนี้จะทำให้รหัสล็อคง่ายขึ้นภายในเนื่องจากต้องมีการตรวจสอบไฟล์ Table Store เท่านั้น
คุณสามารถใช้ Chronicle Queue V5 เพื่ออ่านข้อความที่เขียนด้วย Chronicle Queue V4 แต่สิ่งนี้ไม่ได้รับประกันว่าจะใช้งานได้เสมอ - ถ้าคุณสร้างคิว V4 ของคุณด้วย wireType(WireType.FIELDLESS_BINARY)
อ่านส่วนหัวของคิว เรามีการทดสอบบางอย่างสำหรับการอ่านคิว V4 แต่สิ่งเหล่านี้มี จำกัด และสถานการณ์ทั้งหมดอาจไม่ได้รับการสนับสนุน
คุณไม่สามารถใช้พงศาวดารคิว V5 เพื่อเขียนถึงคิวพงศาวดาร V4 คิว
Chronicle Queue V4 เป็นการเขียนคิวพงศาวดารใหม่ที่สมบูรณ์ซึ่งแก้ปัญหาต่อไปนี้ที่มีอยู่ใน V3
หากไม่มีข้อความที่อธิบายตัวเองผู้ใช้จะต้องสร้างฟังก์ชั่นของตนเองสำหรับการทิ้งข้อความและการจัดเก็บข้อมูลระยะยาว ด้วย V4 คุณไม่ต้องทำสิ่งนี้ แต่คุณสามารถทำได้หากคุณต้องการ
คิว Vanilla Chronicle จะสร้างไฟล์ต่อเธรด อย่างไรก็ตามจำนวนเธรดถูกควบคุมอย่างไรก็ตามแอปพลิเคชันจำนวนมากมีการควบคุมเพียงเล็กน้อยหรือไม่มีเลยว่ามีกี่เธรดที่ใช้และสิ่งนี้ทำให้เกิดปัญหาการใช้งาน
การกำหนดค่าสำหรับการจัดทำดัชนีและ Vanilla Chronicle นั้นอยู่ในรหัสทั้งหมดดังนั้นผู้อ่านจะต้องมีการกำหนดค่าเช่นเดียวกับนักเขียนและไม่ชัดเจนเสมอไปว่ามันคืออะไร
ไม่มีทางที่ผู้ผลิตจะรู้ว่าข้อมูลได้ถูกจำลองไปยังเครื่อง A ที่สองเท่าไหร่ วิธีแก้ปัญหาเพียงอย่างเดียวคือการทำซ้ำข้อมูลกลับไปยังผู้ผลิต
คุณต้องระบุขนาดของข้อมูลเพื่อสำรองก่อนที่คุณจะเริ่มเขียนข้อความของคุณ
คุณต้องทำการล็อคของคุณเองสำหรับ Appender เมื่อใช้ Chronicle ที่จัดทำดัชนี
ในพงศาวดารคิว V3 ทุกอย่างอยู่ในแง่ของไบต์ไม่ใช่สาย มีสองวิธีในการใช้ไบต์ในพงศาวดารคิว V4 คุณสามารถใช้วิธี writeBytes
และ readBytes
หรือคุณสามารถรับ bytes()
จากสาย ตัวอย่างเช่น:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ()));
try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}
Chronicle Queue Enterprise Edition เป็นรุ่นที่ได้รับการสนับสนุนเชิงพาณิชย์ของคิวโอเพ่นซอร์สโครนิเคิลที่ประสบความสำเร็จของเรา เอกสารโอเพนซอร์สถูกขยายออกไปโดยเอกสารต่อไปนี้เพื่ออธิบายคุณสมบัติเพิ่มเติมที่มีอยู่เมื่อคุณได้รับใบอนุญาตสำหรับ Enterprise Edition นี่คือ:
การเข้ารหัสคิวข้อความและข้อความ สำหรับข้อมูลเพิ่มเติมดูเอกสารการเข้ารหัส
การจำลองแบบ TCP/IP (และตัวเลือก UDP) ระหว่างโฮสต์เพื่อให้แน่ใจว่าข้อมูลการสำรองข้อมูลคิวทั้งหมดของคุณตามเวลาจริง สำหรับข้อมูลเพิ่มเติมดูเอกสารประกอบการจำลองแบบโปรโตคอลการจำลองแบบคิวจะครอบคลุมในโปรโตคอลการจำลองแบบ
การสนับสนุนเขตเวลาสำหรับการจัดตารางการโรลโอเวอร์คิวรายวัน สำหรับข้อมูลเพิ่มเติมดูการสนับสนุนเขตเวลา
โหมด Async รองรับประสิทธิภาพที่ดีขึ้นที่ปริมาณงานสูงในระบบไฟล์ที่ช้าลง สำหรับข้อมูลเพิ่มเติมโปรดดูที่โหมด Async และประสิทธิภาพ
pre-toucher สำหรับการปรับปรุงค่าผิดปกติดู pre-toucher และการกำหนดค่าของมัน
นอกจากนี้คุณจะได้รับการสนับสนุนอย่างเต็มที่จากผู้เชี่ยวชาญด้านเทคนิคของเรา
สำหรับข้อมูลเพิ่มเติมเกี่ยวกับ Chronicle Queue Enterprise Edition โปรดติดต่อ [email protected]
คิวพงศาวดารถูกกำหนดโดย SingleChronicleQueue.class
ที่ออกแบบมาเพื่อสนับสนุน:
กลิ้งไฟล์เป็นรายวันรายสัปดาห์หรือรายชั่วโมง
นักเขียนพร้อมกันในเครื่องเดียวกัน
เครื่องอ่านที่เกิดขึ้นพร้อมกันในเครื่องเดียวกันหรือในหลาย ๆ เครื่องผ่านการจำลองแบบ TCP (พร้อมพงศาวดารคิว Enterprise)
ผู้อ่านและนักเขียนที่เกิดขึ้นพร้อมกันระหว่าง Docker หรือเวิร์กโหลดคอนเทนเนอร์อื่น ๆ
การคัดลอกอนุกรมและ deserialization เป็นศูนย์
การเขียน/อ่านหลายล้านรายการต่อวินาทีในฮาร์ดแวร์สินค้าโภคภัณฑ์
ประมาณ 5 ล้านข้อความ/วินาทีสำหรับข้อความ 96 ไบต์บนโปรเซสเซอร์ i7-4790 โครงสร้างไดเรกทอรีคิวมีดังนี้:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling.
รูปแบบประกอบด้วยไบต์ขนาดที่กำหนดขนาดซึ่งจัดรูปแบบโดยใช้ BinaryWire
หรือ TextWire
คิวพงศาวดารถูกออกแบบมาให้ขับเคลื่อนจากรหัส คุณสามารถเพิ่มอินเทอร์เฟซที่เหมาะสมกับความต้องการของคุณได้อย่างง่ายดาย
บันทึก | เนื่องจากการดำเนินงานที่ค่อนข้างต่ำ เพื่อป้องกันการเสียชีวิตของเธรดอาจเป็นประโยชน์ในการจับ RuntimeExceptions และบันทึก/วิเคราะห์ตามความเหมาะสม |
บันทึก | สำหรับการสาธิตวิธีการใช้คิวพงศาวดารดูการสาธิตคิวพงศาวดารและสำหรับเอกสาร Java ดูคิวพง |
ในส่วนต่อไปนี้ก่อนอื่นเราแนะนำคำศัพท์และการอ้างอิงอย่างรวดเร็วเพื่อใช้คิวพงศาวดาร จากนั้นเราให้คำแนะนำโดยละเอียดเพิ่มเติม
Chronicle Queue เป็นวารสารข้อความที่คงอยู่ซึ่งรองรับนักเขียนและผู้อ่านที่เกิดขึ้นพร้อมกันแม้กระทั่งในหลาย JVMs ในเครื่องเดียวกัน ผู้อ่านทุกคนเห็นทุกข้อความและผู้อ่านสามารถเข้าร่วมได้ตลอดเวลาและยังเห็นทุกข้อความ
บันทึก | เราจงใจหลีกเลี่ยงคำว่า ผู้บริโภค และใช้ ผู้อ่าน เป็นข้อความไม่ได้ถูกใช้/ถูกทำลายโดยการอ่าน |
คิวพงศาวดารมีแนวคิดหลักดังต่อไปนี้:
ข้อความที่ตัดตอนมา
ข้อความที่ตัดตอนมาเป็นคอนเทนเนอร์ข้อมูลหลักในคิวพงศาวดาร กล่าวอีกนัยหนึ่งคิวพงศาวดารแต่ละคิวประกอบด้วยข้อความที่ตัดตอนมา การเขียนข้อความไปยังคิวพงศาวดารหมายถึงการเริ่มต้นข้อความที่ตัดตอนมาใหม่การเขียนข้อความลงไปและจบข้อความที่ตัดตอนมาในตอนท้าย
ส่ง
Appender คือแหล่งที่มาของข้อความ บางอย่างเช่นตัววนซ้ำในสภาพแวดล้อมพงศาวดาร คุณเพิ่มข้อมูลต่อท้ายคิวพงศาวดารปัจจุบัน มันสามารถดำเนินการเขียนตามลำดับโดยผนวกเข้ากับจุดสิ้นสุดของคิวเท่านั้น ไม่มีวิธีแทรกหรือลบข้อความที่ตัดตอนมา
ผู้ตัดแต่ง
Tailer เป็นเครื่องอ่านที่ตัดตอนมาได้ดีที่สุดสำหรับการอ่านตามลำดับ มันสามารถดำเนินการอ่านตามลำดับและแบบสุ่มทั้งไปข้างหน้าและข้างหลัง Tailers อ่านข้อความถัดไปที่มีอยู่ทุกครั้งที่เรียก รับประกันต่อไปนี้ในคิวพงศาวดาร:
สำหรับแต่ละ Appender ข้อความจะถูกเขียนตามลำดับที่ Appender เขียนไว้ ข้อความโดยผู้เข้าต่างกันที่แตกต่างกันนั้นเป็น interleaved
สำหรับแต่ละ คน จะเห็นข้อความทุกข้อความสำหรับหัวข้อในลำดับเดียวกับคนอื่น ๆ ทุกคน
เมื่อทำซ้ำทุกแบบจำลองจะมีสำเนาของทุกข้อความ
คิวพงศาวดารเป็นนายหน้าน้อย หากคุณต้องการสถาปัตยกรรมที่มีนายหน้าโปรดติดต่อ [email protected]
ไฟล์กลิ้งไฟล์และคิว
Chronicle Queue ได้รับการออกแบบมาเพื่อม้วนไฟล์ขึ้นอยู่กับวงจรม้วนที่เลือกเมื่อสร้างคิว (ดู RollCycles) กล่าวอีกนัยหนึ่งไฟล์คิวถูกสร้างขึ้นสำหรับแต่ละรอบม้วนซึ่งมีส่วนขยาย cq4
เมื่อวงจรม้วนมาถึงจุดที่ควรจะม้วน Appender จะเขียนเครื่องหมาย EOF
แบบอะตอมในตอนท้ายของไฟล์ปัจจุบันเพื่อระบุว่าไม่มีผู้คัดค้านคนอื่นที่ควรเขียนลงในไฟล์นี้และไม่ควรอ่านเพิ่มเติมและทุกคนควรใช้ไฟล์ใหม่แทน
หากกระบวนการถูกปิดตัวลงและรีสตาร์ทในภายหลังเมื่อวงจรม้วนควรใช้ไฟล์ใหม่ Appender จะพยายามค้นหาไฟล์เก่าและเขียนเครื่องหมาย EOF
ในไฟล์เหล่านั้นเพื่อช่วยปรับแต่งการอ่าน
หัวข้อ
แต่ละหัวข้อเป็นไดเรกทอรีของไฟล์คิว หากคุณมีหัวข้อที่เรียกว่า mytopic
เค้าโครงอาจมีลักษณะเช่นนี้:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4
ในการคัดลอกข้อมูลทั้งหมดสำหรับวันเดียว (หรือรอบ) คุณสามารถคัดลอกไฟล์สำหรับวันนั้นไปยังเครื่องพัฒนาของคุณสำหรับการทดสอบซ้ำ
ข้อ จำกัด ในหัวข้อและข้อความ
หัวข้อถูก จำกัด ให้เป็นสตริงซึ่งสามารถใช้เป็นชื่อไดเรกทอรี ภายในหัวข้อคุณสามารถมีหัวข้อย่อยซึ่งสามารถเป็นประเภทข้อมูลใด ๆ ที่สามารถเป็นอนุกรม ข้อความสามารถเป็นข้อมูลที่เป็นอนุกรมใด ๆ
คิวพงศาวดารสนับสนุน:
วัตถุ Serializable
แม้ว่าจะต้องหลีกเลี่ยงเนื่องจากไม่มีประสิทธิภาพ
วัตถุ Externalizable
เป็นที่ต้องการหากคุณต้องการใช้ Java APIs มาตรฐาน
byte[]
และ String
Marshallable
; ข้อความที่อธิบายตัวเองซึ่งสามารถเขียนเป็น Yaml, Binary Yaml หรือ JSON
BytesMarshallable
ซึ่งเป็นไบนารีระดับต่ำหรือการเข้ารหัสข้อความ
ส่วนนี้ให้การอ้างอิงอย่างรวดเร็วสำหรับการใช้คิวพงศาวดารเพื่อแสดงวิธีการสร้างเขียน/อ่านเข้าไปใน/จากคิว
การก่อสร้างคิวพงศาวดาร
การสร้างอินสแตนซ์ของคิวพงศาวดารนั้นแตกต่างจากการเรียกตัวสร้าง ในการสร้างอินสแตนซ์คุณต้องใช้ ChronicleQueueBuilder
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build ();
ในตัวอย่างนี้เราได้สร้าง IndexedChronicle
ซึ่งสร้างสอง RandomAccessFiles
หนึ่งสำหรับดัชนีและหนึ่งสำหรับข้อมูลที่มีชื่อค่อนข้าง:
${java.io.tmpdir}/getting-started/{today}.cq4
เขียนถึงคิว
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
อ่านจากคิว
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
นอกจากนี้วิธี ChronicleQueue.dump()
สามารถใช้ในการทิ้งเนื้อหาดิบเป็นสตริง
queue . dump ();
การทำความสะอาด
Chronicle Queue เก็บข้อมูลไว้นอกกองและขอแนะนำให้คุณโทร close()
เมื่อคุณทำงานกับคิวพงศาวดารเสร็จแล้วไปยังทรัพยากรฟรี
บันทึก | จะไม่มีข้อมูลหายไปหากคุณทำเช่นนี้ นี่เป็นเพียงการทำความสะอาดทรัพยากรที่ใช้ |
queue . close ();
รวมเข้าด้วยกัน
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
}
คุณสามารถกำหนดค่าคิวพงศาวดารโดยใช้พารามิเตอร์การกำหนดค่าหรือคุณสมบัติของระบบ นอกจากนี้ยังมีวิธีการเขียน/การอ่านที่แตกต่างกันใน/จากคิวเช่นการใช้พร็อกซีและการใช้ MethodReader
และ MethodWriter
พงศาวดารคิว (CQ) สามารถกำหนดค่าได้ผ่านหลายวิธีในชั้นเรียน SingleChronicleQueueBuilder
พารามิเตอร์บางอย่างที่ลูกค้าของเราสอบถามได้อธิบายไว้ด้านล่าง
ม้วน
พารามิเตอร์ RollCycle
กำหนดค่าอัตราที่ CQ จะม้วนไฟล์คิวพื้นฐาน ตัวอย่างเช่นการใช้ตัวอย่างโค้ดต่อไปนี้จะส่งผลให้ไฟล์คิวถูกรีด (เช่นไฟล์ใหม่ที่สร้างขึ้น) ทุกชั่วโมง:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build ()
เมื่อมีการตั้งค่ารอบม้วนของคิวแล้วจะไม่สามารถเปลี่ยนแปลงได้ในภายหลัง อินสแตนซ์เพิ่มเติมใด ๆ ของ SingleChronicleQueue
ที่กำหนดค่าให้ใช้เส้นทางเดียวกันควรกำหนดค่าให้ใช้วงจรม้วนเดียวกันและหากไม่ได้เป็นเช่นนั้นวงจรม้วนจะได้รับการปรับปรุงให้ตรงกับม้วนวงจรที่คงอยู่ ในกรณีนี้ข้อความบันทึกคำเตือนจะถูกพิมพ์เพื่อแจ้งผู้ใช้ห้องสมุดเกี่ยวกับสถานการณ์:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}
เอาต์พุตคอนโซล:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.
จำนวนข้อความสูงสุดที่สามารถเก็บไว้ในไฟล์คิวขึ้นอยู่กับวัฏจักรม้วน ดูคำถามที่พบบ่อยสำหรับข้อมูลเพิ่มเติมเกี่ยวกับเรื่องนี้
ในคิวพงศาวดารเวลาโรลโอเวอร์ขึ้นอยู่กับ UTC คุณลักษณะของ Zone Rollover Enterprise ขยายความสามารถของคิวพงศาวดารในการระบุเวลาและระยะเวลาของคิวโรลโอเวอร์มากกว่า UTC สำหรับข้อมูลเพิ่มเติมโปรดดู Rollover คิว Timezone
คลาส Chronicle Queue FileUtil
ให้วิธีการที่เป็นประโยชน์สำหรับการจัดการไฟล์คิว ดูการจัดการไฟล์ม้วนโดยตรง
สายพันธุ์
เป็นไปได้ที่จะกำหนดค่าวิธีการจัดเก็บคิวพงศาวดารจะจัดเก็บข้อมูลโดยตั้งค่า WireType
อย่างชัดเจน:
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )
ตัวอย่างเช่น:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )
แม้ว่ามันจะเป็นไปได้ที่จะให้ wiretype อย่างชัดเจนเมื่อสร้างตัวสร้าง แต่ก็ไม่ได้รับการสนับสนุนเนื่องจากสายไฟทั้งหมดไม่ได้รับการสนับสนุนโดยคิวพงศาวดาร โดยเฉพาะอย่างยิ่งประเภทลวดต่อไปนี้ไม่ได้รับการสนับสนุน:
ข้อความ (และโดยพื้นฐานแล้วทั้งหมดขึ้นอยู่กับข้อความรวมถึง JSON และ CSV)
ดิบ
read_any
บล็อกขนาด
เมื่อมีการอ่าน/เขียนคิวส่วนหนึ่งของไฟล์ที่กำลังอ่าน/เขียนกำลังถูกแมปเข้ากับเซ็กเมนต์หน่วยความจำ พารามิเตอร์นี้ควบคุมขนาดของบล็อกการแมปหน่วยความจำ คุณสามารถเปลี่ยนพารามิเตอร์นี้โดยใช้วิธีการแบบ SingleChronicleQueueBuilder.blockSize(long blockSize)
หากจำเป็น
บันทึก | คุณควรหลีกเลี่ยงการเปลี่ยน blockSize โดยไม่จำเป็น |
หากคุณกำลังส่งข้อความขนาดใหญ่คุณควรตั้ง blockSize
ขนาดใหญ่เช่น blockSize
ควรมีขนาดข้อความอย่างน้อยสี่เท่า
คำเตือน | หากคุณใช้ blockSize ขนาดเล็กสำหรับข้อความขนาดใหญ่คุณจะได้รับ IllegalStateException และการเขียนถูกยกเลิก |
เราขอแนะนำให้คุณใช้ blockSize
เดียวกันสำหรับแต่ละอินสแตนซ์คิวเมื่อทำซ้ำคิว blockSize
ไม่ได้เขียนไปยังข้อมูลเมตาของคิวดังนั้นควรตั้งค่าเป็นค่าเดียวกันเมื่อสร้างอินสแตนซ์ของคิวพงศาวดาร (แนะนำ แต่ถ้าคุณต้องการ ในการทำงานด้วย blocksize
ที่แตกต่างกันที่คุณสามารถทำได้)
เคล็ดลับ | ใช้ blockSize เดียวกันสำหรับแต่ละอินสแตนซ์ของคิวที่ทำซ้ำ |
indexpacing
พารามิเตอร์นี้แสดงช่องว่างระหว่างข้อความที่ตัดตอนมาซึ่งมีการจัดทำดัชนีอย่างชัดเจน ตัวเลขที่สูงขึ้นหมายถึงประสิทธิภาพการเขียนตามลำดับที่สูงขึ้น ประสิทธิภาพการอ่านตามลำดับไม่ได้รับผลกระทบจากคุณสมบัตินี้ ตัวอย่างเช่นระยะห่างดัชนีเริ่มต้นต่อไปนี้สามารถส่งคืนได้:
16 (ละเอียด)
64 (ทุกวัน)
คุณสามารถเปลี่ยนพารามิเตอร์นี้ได้โดยใช้วิธีการแบบ SingleChronicleQueueBuilder.indexSpacing(int indexSpacing)
indexcount
ขนาดของแต่ละอาร์เรย์ดัชนีรวมถึงจำนวนทั้งหมดของอาร์เรย์ดัชนีต่อไฟล์คิว
บันทึก | IndexCount 2 คือจำนวนสูงสุดของรายการคิวที่จัดทำดัชนี |
บันทึก | ดูการจัดทำดัชนีที่ตัดตอนมาในส่วนในคิวพงศาวดารของคู่มือผู้ใช้นี้สำหรับข้อมูลเพิ่มเติมและตัวอย่างของการใช้ดัชนี |
ReadBufferMode, writeBufferMode
พารามิเตอร์เหล่านี้กำหนด buffermode สำหรับการอ่านหรือเขียนที่มีตัวเลือกต่อไปนี้:
None
- ค่าเริ่มต้น (และหนึ่งเดียวสำหรับผู้ใช้โอเพนซอร์ส) ไม่มีบัฟเฟอร์
Copy
- ใช้ร่วมกับการเข้ารหัส;
Asynchronous
- ใช้บัฟเฟอร์แบบอะซิงโครนัสเมื่ออ่านและ/หรือการเขียนโดยโหมด Chronicle Async
การบัฟฟอร์คเคป
ความจุ ringbuffer ในไบต์เมื่อใช้ bufferMode: Asynchronous
ในคิวพงศาวดารเราอ้างถึงการเขียนข้อมูลของคุณไปยังคิวพงศาวดารเพื่อจัดเก็บข้อความที่ตัดตอนมา ข้อมูลนี้สามารถสร้างขึ้นจากประเภทข้อมูลใด ๆ รวมถึงข้อความตัวเลขหรือ blobs ที่เป็นอนุกรม ในที่สุดข้อมูลทั้งหมดของคุณไม่ว่ามันจะถูกเก็บไว้เป็นชุดของไบต์
ก่อนที่จะเก็บข้อความที่ตัดตอนมาของคุณคิวพงศาวดารขอสงวนส่วนหัว 4 ไบต์ Chronicle Queue เขียนความยาวของข้อมูลของคุณลงในส่วนหัวนี้ ด้วยวิธีนี้เมื่อคิวพงศาวดารมาอ่านข้อความที่ตัดตอนมาของคุณมันจะรู้ว่าข้อมูลแต่ละใบมีเวลานานเท่าใด เราอ้างถึงส่วนหัว 4 ไบต์นี้พร้อมกับข้อความที่ตัดตอนมาของคุณเป็นเอกสาร คิวพงศาวดารที่พูดอย่างเคร่งครัดสามารถใช้ในการอ่านและเขียนเอกสาร
บันทึก | ภายในส่วนหัว 4 ไบต์นี้เรายังสำรองบิตสองสามบิตสำหรับการดำเนินการภายในจำนวนมากเช่นการล็อคเพื่อให้ความปลอดภัยของคิวพงศาวดารทั้งในโปรเซสเซอร์และเธรด สิ่งสำคัญที่ควรทราบคือด้วยเหตุนี้คุณจึงไม่สามารถแปลง 4 ไบต์เป็นจำนวนเต็มเพื่อค้นหาความยาวของข้อมูลของคุณอย่างเคร่งครัด |
ตามที่ระบุไว้ก่อนหน้านี้คิวพงศาวดารใช้ appender เพื่อเขียนลงในคิวและ ตัวตัดแต่ง เพื่ออ่านจากคิว ซึ่งแตกต่างจากโซลูชันการคิว Java อื่น ๆ ข้อความจะไม่หายไปเมื่ออ่านด้วยตัวตัดแต่ง สิ่งนี้ครอบคลุมในรายละเอียดเพิ่มเติมในส่วนด้านล่างเรื่อง "การอ่านจากคิวโดยใช้เทลเลอร์" ในการเขียนข้อมูลไปยังคิวพงศาวดารคุณต้องสร้าง Appender ก่อน:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}
Chronicle Queue ใช้อินเทอร์เฟซระดับต่ำต่อไปนี้เพื่อเขียนข้อมูล:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
}
การปิดการทดลองกับทรัพยากรคือจุดที่ความยาวของข้อมูลถูกเขียนไปยังส่วนหัว นอกจากนี้คุณยังสามารถใช้ DocumentContext
เพื่อค้นหาดัชนีที่ข้อมูลของคุณเพิ่งได้รับการกำหนด (ดูด้านล่าง) ในภายหลังคุณสามารถใช้ดัชนีนี้เพื่อย้ายไป/ค้นหาข้อความที่ตัดตอนมานี้ ข้อความที่ตัดตอนมาจากคิวพงศาวดารแต่ละรายการมีดัชนีที่ไม่ซ้ำกัน
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
}
วิธีการระดับสูงด้านล่างเช่น writeText()
เป็นวิธีการสะดวกสบายในการโทรหา appender.writingDocument()
แต่ทั้งสองวิธีทำสิ่งเดียวกันเป็นหลัก รหัสจริงของ writeText(CharSequence text)
มีลักษณะเช่นนี้:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}
ดังนั้นคุณจึงมีตัวเลือกอินเทอร์เฟซระดับสูงจำนวนหนึ่งลงไปที่ API ระดับต่ำไปยังหน่วยความจำดิบ
นี่คือ API ระดับสูงสุดที่ซ่อนความจริงที่คุณเขียนถึงส่งข้อความเลย ประโยชน์คือคุณสามารถแลกเปลี่ยนการโทรไปยังอินเทอร์เฟซด้วยส่วนประกอบจริงหรืออินเทอร์เฟซกับโปรโตคอลอื่น
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));
คุณสามารถเขียน "ข้อความอธิบายตัวเอง" ข้อความดังกล่าวสามารถรองรับการเปลี่ยนแปลงสคีมา พวกเขายังเข้าใจได้ง่ายขึ้นเมื่อทำการดีบักหรือวินิจฉัยปัญหา
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));
คุณสามารถเขียน "ข้อมูลดิบ" ซึ่งเป็นการอธิบายตัวเอง ประเภทจะถูกต้องเสมอ ตำแหน่งเป็นข้อบ่งชี้เพียงอย่างเดียวเกี่ยวกับความหมายของค่าเหล่านั้น
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));
คุณสามารถเขียน "ข้อมูลดิบ" ซึ่งไม่ได้อธิบายตัวเอง ผู้อ่านของคุณต้องรู้ว่าข้อมูลนี้มีความหมายอย่างไรและประเภทที่ใช้
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));
ด้านล่างวิธีการเขียนระดับต่ำสุดจะแสดงไว้ คุณได้รับที่อยู่สำหรับหน่วยความจำดิบและคุณสามารถเขียนสิ่งที่คุณต้องการ
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});
คุณสามารถพิมพ์เนื้อหาของคิว คุณสามารถดูสองข้อความแรกและสองข้อความสุดท้ายเก็บข้อมูลเดียวกัน
// dump the content of the queue
System . out . println ( queue . dump ());
ภาพพิมพ์:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World
การอ่านคิวเป็นไปตามรูปแบบเดียวกับการเขียนยกเว้นว่ามีความเป็นไปได้ที่จะไม่มีข้อความเมื่อคุณพยายามอ่าน
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
}
คุณสามารถเปลี่ยนแต่ละข้อความเป็นวิธีการโทรตามเนื้อหาของข้อความและให้คิวพงศาวดาร deserialize อาร์กิวเมนต์วิธีการโดยอัตโนมัติ การโทร reader.readOne()
จะข้าม (กรองออก) ข้อความใด ๆ ที่ไม่ตรงกับวิธีการอ่านวิธีการของคุณ
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());
คุณสามารถถอดรหัสข้อความด้วยตัวเอง
บันทึก | ชื่อประเภทและลำดับของฟิลด์ไม่ต้องตรงกัน |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));
คุณสามารถอ่านค่าข้อมูลที่อธิบายตัวเองได้ สิ่งนี้จะตรวจสอบว่าประเภทนั้นถูกต้องและแปลงตามที่ต้องการ
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));
คุณสามารถอ่านข้อมูลดิบเป็นดั้งเดิมและสตริง
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));
หรือคุณสามารถรับที่อยู่หน่วยความจำพื้นฐานและเข้าถึงหน่วยความจำดั้งเดิม
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
}));
บันทึก | ทุกคนเห็นทุกข้อความ |
สามารถเพิ่มสิ่งที่เป็นนามธรรมลงในข้อความกรองหรือกำหนดข้อความให้กับตัวประมวลผลข้อความเพียงตัวเดียว อย่างไรก็ตามโดยทั่วไปคุณต้องการเพียงหนึ่งตัวตัดหลักสำหรับหัวข้อโดยอาจมีบางส่วนรองรับบางส่วนสำหรับการตรวจสอบ ฯลฯ
เนื่องจากคิวพงศาวดารไม่ได้แบ่งหัวข้อหัวข้อคุณจะได้รับการสั่งซื้อทั้งหมดของข้อความทั้งหมดภายในหัวข้อนั้น ในหัวข้อนี้ไม่มีการรับประกันการสั่งซื้อ หากคุณต้องการเล่นซ้ำการกำหนดจากระบบที่ใช้จากหลายหัวข้อเราขอแนะนำให้เล่นซ้ำจากผลลัพธ์ของระบบนั้น
ตัวปรับคิวพงศาวดารอาจสร้างตัวจัดการไฟล์ตัวจัดการไฟล์จะถูกทำความสะอาดเมื่อใดก็ตามที่มีการเรียกใช้วิธี close()
ของคิวพงศาวดารที่เกี่ยวข้องหรือเมื่อใดก็ตามที่ JVM รันการรวบรวมขยะ หากคุณกำลังเขียนรหัสของคุณไม่มี GC หยุดชั่วคราวและคุณต้องการทำความสะอาดตัวจัดการไฟล์อย่างชัดเจนคุณสามารถโทรไปที่สิ่งต่อไปนี้:
(( StoreTailer ) tailer ). releaseResources ()
ExcerptTailer.toEnd()
ในบางแอปพลิเคชันอาจจำเป็นต้องเริ่มอ่านจากปลายคิว (เช่นในสถานการณ์รีสตาร์ท) สำหรับกรณีการใช้งานนี้ ExcerptTailer
ให้วิธีการ toEnd()
เมื่อทิศทางของ tailer ถูก FORWARD
(โดยค่าเริ่มต้นหรือตามที่กำหนดโดยวิธี ExcerptTailer.direction
) จากนั้นการเรียก toEnd()
จะวาง tailer หลังจาก บันทึกล่าสุดที่มีอยู่ในคิว ในกรณีนี้ Tailer พร้อมแล้วสำหรับการอ่านบันทึกใหม่ใด ๆ ต่อท้ายคิว จนกว่าข้อความใหม่จะถูกผนวกเข้ากับคิวจะไม่มี DocumentContext
ใหม่สำหรับการอ่าน:
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();
หากจำเป็นต้องอ่านย้อนกลับผ่านคิวจากตอนท้ายผู้ตัดแต่งสามารถตั้งค่าให้อ่านย้อนหลัง:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd ();
เมื่ออ่านย้อนกลับวิธีการ toEnd()
จะย้าย tailer ไปยังระเบียนสุดท้ายในคิว หากคิวไม่ว่างเปล่าจะมี DocumentContext
สำหรับการอ่าน:
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();
aka ชื่อ tailers
มันจะเป็นประโยชน์ในการมีตัวตัดแต่งซึ่งดำเนินต่อไปจากจุดที่มันขึ้นอยู่กับการรีสตาร์ทของแอปพลิเคชัน
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}
Tailer "A" อ่านข้อความล่าสุด 2
Tailer "A" อ่านข้อความถัดไป 3
Tailer "b" last reads message 0
Tailer "b" next reads message 1
This is from the RestartableTailerTest
where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart()
, toEnd()
, moveToIndex()
or reads a message.
บันทึก | The direction() is not preserved across restarts, only the next index to be read. |
บันทึก | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4
:
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
This can often be a bit difficult to read, so it is better to dump the cq4
files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain
or net.openhft.chronicle.queue.ChronicleReaderMain
. DumpMain
performs a simple dump to the terminal while ChronicleReaderMain
handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4
file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar
, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
เคล็ดลับ | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
This will dump the 19700101-02.cq4
file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining
บันทึก | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh
located in the Chonicle-Queue/bin
-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/dump_queue.sh <file path>
ChronicleReaderMain
The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain
(in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain
:
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue
you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build >
Once the Uber jar is present, you can run ChronicleReaderMain
from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
ChronicleWriter
If using MethodReader
and MethodWriter
then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain
or the shell script queue_writer.sh
eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method name
If you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}
public class DTO extends SelfDescribingMarshallable { private int age; ชื่อสตริงส่วนตัว; -
Then you can call ChronicleWriterMain -d queue doit x.yaml
with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}
หรือ
!x.y.z.DTO {
age : 42,
name : Percy
}
If DTO
makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface
, where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}
To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));
These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}
To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());
Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId
of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));
A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}
First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender();
the acquireAppender()
uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();
will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()
Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
บันทึก | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
Move to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
}
You can add a StoreFileListener
to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY
cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber
providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY
cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor
class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );
otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );
The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher
is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute()
method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown()
method can be called to close the supplied queue and release any other resources. Invocation of the execute()
method after shutdown()
has been called will cause an IllegalStateException
to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle
(defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
คำเตือน | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs
(defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs
.
SingleChronicleQueueExcerpts.dontWrite
(defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle
from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle
is set to true
and pretoucherPrerollTimeMs
to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" );
The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute()
method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
}
The method close()
, closes the Pretoucher and releases its resources.
pretouch . close ();
บันทึก | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
บันทึก | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument()
is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument()
call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close()
is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true);
บันทึก | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
ผลลัพธ์:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark
), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 - -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 -
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 - -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 -
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 - -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 -
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------ Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 - -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 -
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain
This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop
, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal()
if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint
into the code because thread dumps are only reported at safe-points.
ผลลัพธ์:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.