PHP-rdkafka เป็นไคลเอนต์ Kafka ที่เสถียร พร้อมใช้งานจริง และ รวดเร็ว สำหรับ PHP ที่ใช้ librdkafka
เวอร์ชันปัจจุบันรองรับ PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8 เวอร์ชัน 6.x รองรับ PHP 7.x..8.x, librdkafka 0.11..2.x เวอร์ชันเก่ารองรับ PHP 5
เป้าหมายของการขยายคือการผูกมัด librdkafka ที่ไม่มีความเห็นในระดับต่ำโดยเน้นไปที่การผลิตและการสนับสนุนระยะยาว
รองรับ Consumer , Producer และ API ข้อมูลเมตา ระดับสูงและต่ำ
เอกสารมีอยู่ที่นี่
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
พารามิเตอร์การกำหนดค่าที่ใช้ด้านล่างนี้สามารถพบได้ในข้อมูลอ้างอิงการกำหนดค่า Librdkafka
สำหรับการผลิต ก่อนอื่นเราต้องสร้างผู้ผลิตและเพิ่มโบรกเกอร์ (เซิร์ฟเวอร์ Kafka) ลงไป:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
คำเตือน ตรวจสอบ ให้แน่ใจว่าผู้ผลิตของคุณปฏิบัติตามการปิดระบบอย่างเหมาะสม (ดูด้านล่าง) เพื่อไม่ให้ข้อความสูญหาย
ต่อไป เราสร้างอินสแตนซ์หัวข้อจากผู้ผลิต:
<?php
$ topic = $ rk -> newTopic ( " test " );
จากนั้น เราสามารถสร้างข้อความได้มากเท่าที่ต้องการ โดยใช้วิธี ผลิต:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
อาร์กิวเมนต์แรกคือพาร์ติชัน RD_KAFKA_PARTITION_UA ย่อมาจาก unsigned และให้ librdkafka เลือกพาร์ติชัน
อาร์กิวเมนต์ที่สองคือแฟล็กข้อความ และควรเป็น 0 อย่างใดอย่างหนึ่ง
หรือ RD_KAFKA_MSG_F_BLOCK
เพื่อบล็อกการผลิตในคิวเต็ม เพย์โหลดข้อความสามารถเป็นอะไรก็ได้
สิ่งนี้ควรทำก่อนที่จะทำลายอินสแตนซ์ของผู้ผลิต
เพื่อให้แน่ใจว่าคำขอผลิตผลที่อยู่ในคิวและในเที่ยวบินทั้งหมดเสร็จสมบูรณ์
ก่อนที่จะยุติ ใช้ค่าที่เหมาะสมสำหรับ $timeout_ms
คำเตือน การไม่โทรฟลัชอาจทำให้ข้อความสูญหายได้!
$ rk -> flush ( $ timeout_ms );
ในกรณีที่คุณไม่สนใจเกี่ยวกับการส่งข้อความที่ยังไม่ได้ส่ง คุณสามารถใช้ purge()
ก่อนที่จะเรียก flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
คลาส RdKafkaKafkaConsumer รองรับการกำหนด/เพิกถอนพาร์ติชันอัตโนมัติ ดูตัวอย่างที่นี่
หมายเหตุ ผู้บริโภคระดับต่ำคือ API ดั้งเดิม โปรดเลือกใช้ผู้บริโภคระดับสูง
ก่อนอื่นเราต้องสร้างผู้บริโภคระดับต่ำและเพิ่มโบรกเกอร์ (เซิร์ฟเวอร์คาฟคา) เข้าไป:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
ถัดไป สร้างอินสแตนซ์หัวข้อโดยการเรียกใช้เมธอด newTopic()
และเริ่มใช้งานบนพาร์ติชัน 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
ถัดไป ดึงข้อความที่ใช้:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
หมายเหตุ ผู้บริโภคระดับต่ำคือ API ดั้งเดิม โปรดเลือกใช้ผู้บริโภคระดับสูง
การใช้จากหลายหัวข้อและ/หรือพาร์ติชันสามารถทำได้โดยการบอกให้ librdkafka ส่งต่อข้อความทั้งหมดจากหัวข้อ/พาร์ติชันเหล่านี้ไปยังคิวภายใน จากนั้นจึงใช้จากคิวนี้:
การสร้างคิว:
<?php
$ queue = $ rk -> newQueue ();
การเพิ่มพาร์ติชันหัวข้อลงในคิว:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
ถัดไป ดึงข้อความที่ใช้ไปจากคิว:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka ตามค่าเริ่มต้นร้านค้าจะชดเชยกับนายหน้า
หากคุณใช้ไฟล์ในเครื่องสำหรับการจัดเก็บออฟเซ็ต ตามค่าเริ่มต้น ไฟล์จะถูกสร้างขึ้นในไดเร็กทอรีปัจจุบัน โดยมีชื่อตามหัวข้อและพาร์ติชัน ไดเร็กทอรีสามารถเปลี่ยนแปลงได้โดยการตั้งค่าคุณสมบัติการกำหนดค่า offset.store.path
หากต้องการควบคุมออฟเซ็ตด้วยตนเอง ให้ตั้งค่า enable.auto.offset.store
เป็น false
การตั้งค่า auto.commit.interval.ms
และ auto.commit.enable
จะควบคุม
หากออฟเซ็ตที่เก็บไว้จะถูกมอบให้กับนายหน้าโดยอัตโนมัติและในช่วงเวลาใด
หากต้องการควบคุมออฟเซ็ตด้วยตนเอง ให้ตั้งค่า enable.auto.commit
เป็น false
เวลาสูงสุดที่อนุญาตระหว่างการโทรเพื่อใช้ข้อความสำหรับผู้ใช้ระดับสูง
หากเกินช่วงเวลานี้ ผู้ใช้บริการจะถือว่าล้มเหลวและกลุ่มจะดำเนินการ
ปรับสมดุลเพื่อกำหนดพาร์ติชันใหม่ให้กับสมาชิกกลุ่มผู้บริโภครายอื่น
group.id
มีหน้าที่รับผิดชอบในการตั้งค่ารหัสกลุ่มผู้บริโภคของคุณและควรไม่ซ้ำกัน (และไม่ควรเปลี่ยนแปลง) Kafka ใช้เพื่อจดจำแอปพลิเคชันและจัดเก็บออฟเซ็ตไว้
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
ข้อมูลอ้างอิงการกำหนดค่า Librdkafka
librdkafka จะบัฟเฟอร์ข้อความสูงสุด 1GB สำหรับแต่ละพาร์ติชันที่ใช้ตามค่าเริ่มต้น คุณสามารถลดการใช้หน่วยความจำได้โดยการลดค่าของพารามิเตอร์ queued.max.messages.kbytes
บน Consumer ของคุณ
อินสแตนซ์ผู้บริโภคและผู้ผลิตแต่ละรายการจะดึงข้อมูลเมตาของหัวข้อในช่วงเวลาที่กำหนดโดยพารามิเตอร์ topic.metadata.refresh.interval.ms
ขึ้นอยู่กับเวอร์ชัน librdkafka ของคุณ พารามิเตอร์จะมีค่าเริ่มต้นเป็น 10 วินาทีหรือ 600 วินาที
librdkafka ดึงข้อมูลเมตาสำหรับหัวข้อทั้งหมดของคลัสเตอร์ตามค่าเริ่มต้น การตั้งค่า topic.metadata.refresh.sparse
เป็นสตริง "true"
ทำให้แน่ใจว่า librdkafka ดึงข้อมูลเฉพาะหัวข้อที่เขาใช้
การตั้งค่า topic.metadata.refresh.sparse
เป็น "true"
และ topic.metadata.refresh.interval.ms
เป็น 600 วินาที (บวกกับความกระวนกระวายใจบางส่วน) สามารถลดแบนด์วิดท์ได้มาก ขึ้นอยู่กับจำนวนผู้บริโภคและหัวข้อ
การตั้งค่านี้อนุญาตให้เธรด librdkafka ยุติทันทีที่ librdkafka ใช้งานเธรดเสร็จ สิ่งนี้ทำให้กระบวนการ / คำขอ PHP ของคุณยุติลงอย่างรวดเร็วอย่างมีประสิทธิภาพ
เมื่อเปิดใช้งานสิ่งนี้ คุณจะต้องปกปิดสัญญาณดังนี้:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
เวลาสูงสุดที่การดำเนินการซ็อกเก็ตของนายหน้าอาจบล็อก ค่าที่ต่ำกว่าจะปรับปรุงการตอบสนองโดยเสียค่าใช้จ่ายในการใช้งาน CPU ที่สูงขึ้นเล็กน้อย
การลดค่าของการตั้งค่านี้จะช่วยปรับปรุงความเร็วในการปิดเครื่อง ค่านี้จะกำหนดเวลาสูงสุดที่ librdkafka จะบล็อกในการวนซ้ำของลูปการอ่านหนึ่งครั้ง นอกจากนี้ยังกำหนดความถี่ที่เธรด librdkafka หลักจะตรวจสอบการสิ้นสุดด้วย
นี่เป็นการกำหนดเวลาสูงสุดและค่าเริ่มต้นที่ librdkafka จะรอก่อนที่จะส่งชุดข้อความ การลดการตั้งค่านี้ลงเช่น 1ms ช่วยให้มั่นใจได้ว่าข้อความจะถูกส่งโดยเร็วที่สุด แทนที่จะส่งเป็นชุด
สิ่งนี้ได้รับการเห็นว่าลดเวลาการปิดระบบของอินสแตนซ์ rdkafka และกระบวนการ / คำขอ PHP
นี่คือการกำหนดค่าที่ได้รับการปรับให้เหมาะสมเพื่อให้มีเวลาแฝงต่ำ สิ่งนี้ทำให้กระบวนการ / คำขอ PHP ส่งข้อความโดยเร็วที่สุดและยุติอย่างรวดเร็ว
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
ขอแนะนำให้โทรสำรวจความคิดเห็นเป็นระยะๆ เพื่อให้บริการโทรกลับ ใน php-rdkafka:3.x
การสำรวจความคิดเห็นยังถูกเรียกระหว่างการปิดระบบ ดังนั้นการไม่เรียกมันในช่วงเวลาปกติอาจเกิดขึ้นได้
ทำให้เกิดการปิดเครื่องนานขึ้นเล็กน้อย ตัวอย่างด้านล่างจะสำรวจจนกว่าจะไม่มีเหตุการณ์ในคิวอีกต่อไป:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
สามารถดูแหล่งที่มาของเอกสารได้ที่นี่
หากเอกสารประกอบไม่เพียงพอ โปรดอย่าลังเลที่จะถามคำถามในช่อง php-rdkafka บน Gitter หรือ Google Groups
เนื่องจาก IDE ของคุณไม่สามารถค้นหา php-rdkadka api ได้โดยอัตโนมัติ คุณสามารถพิจารณาใช้แพ็คเกจภายนอกที่มีชุด stubs สำหรับคลาส ฟังก์ชัน และค่าคงที่ของ php-rdkafka: kwn/php-rdkafka-stubs
หากคุณต้องการมีส่วนร่วมขอขอบคุณ :)
ก่อนที่คุณจะเริ่มต้น โปรดดูเอกสารการมีส่วนร่วมเพื่อดูว่าจะรวมการเปลี่ยนแปลงของคุณเข้าด้วยกันได้อย่างไร
คัดลอกเอกสารจาก librdkafka
ผู้เขียน: ดูผู้ร่วมให้ข้อมูล
php-rdkafka เปิดตัวภายใต้ลิขสิทธิ์ MIT