PHP-rdkafka هو عميل Kafka مستقر وجاهز للإنتاج وسريع لـ PHP يعتمد على librdkafka.
الإصدار الحالي يدعم PHP >= 8.1.0، librdkafka >= 1.5.3، كافكا >= 0.8. الإصدار 6.x يدعم PHP 7.x..8.x، librdkafka 0.11..2.x. الإصدارات الأقدم تدعم PHP 5.
الهدف من الامتداد هو أن يكون ربط librdkafka منخفض المستوى وغير خاضع لرأي ويركز على الإنتاج والدعم طويل المدى.
يتم دعم واجهات برمجة التطبيقات للمستهلكين والمنتجين والبيانات التعريفية ذات المستوى العالي والمنخفض.
الوثائق متاحة هنا.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
يمكن العثور على معلمات التكوين المستخدمة أدناه في مرجع تكوين Librdkafka
للإنتاج، نحتاج أولاً إلى إنشاء منتج، وإضافة وسطاء (خوادم كافكا) إليه:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
تحذير تأكد من أن المنتج الخاص بك يتبع إيقاف التشغيل المناسب (انظر أدناه) حتى لا تفقد الرسائل.
بعد ذلك، نقوم بإنشاء مثيل موضوع من المنتج:
<?php
$ topic = $ rk -> newTopic ( " test " );
ومن هناك، يمكننا إنتاج أكبر عدد ممكن من الرسائل، باستخدام طريقة الإنتاج:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
الحجة الأولى هي التقسيم. RD_KAFKA_PARTITION_UA يرمز إلى غير مُخصص ، ويتيح لـ librdkafka اختيار القسم.
الوسيطة الثانية هي إشارات الرسائل ويجب أن تكون إما 0
أو RD_KAFKA_MSG_F_BLOCK
لحظر الإنتاج في قائمة الانتظار الكاملة. يمكن أن تكون حمولة الرسالة أي شيء.
يجب أن يتم ذلك قبل تدمير مثيل المنتج
للتأكد من اكتمال جميع طلبات المنتجات الموجودة في قائمة الانتظار وعلى متن الطائرة
قبل الانتهاء. استخدم قيمة معقولة لـ $timeout_ms
.
تحذير عدم الاتصال بالتدفق يمكن أن يؤدي إلى فقدان الرسالة!
$ rk -> flush ( $ timeout_ms );
في حال كنت لا تهتم بإرسال الرسائل التي لم يتم إرسالها بعد، يمكنك استخدام purge()
قبل استدعاء flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
تدعم فئة RdKafkaKafkaConsumer تعيين/إلغاء القسم تلقائيًا. انظر المثال هنا.
ملاحظة: المستهلك ذو المستوى المنخفض هو واجهة برمجة تطبيقات قديمة، يرجى تفضيل استخدام المستهلك عالي المستوى
نحتاج أولاً إلى إنشاء مستهلك منخفض المستوى، وإضافة وسطاء (خوادم كافكا) إليه:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
بعد ذلك، قم بإنشاء نسخة موضوع عن طريق استدعاء الأسلوب newTopic()
، وابدأ في الاستهلاك على القسم 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
بعد ذلك، قم باسترداد الرسائل المستهلكة:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
ملاحظة: المستهلك ذو المستوى المنخفض هو واجهة برمجة تطبيقات قديمة، يرجى تفضيل استخدام المستهلك عالي المستوى
يمكن إجراء الاستهلاك من مواضيع و/أو أقسام متعددة عن طريق إخبار librdkafka بإعادة توجيه جميع الرسائل من هذه المواضيع/الأقسام إلى قائمة انتظار داخلية، ثم الاستهلاك من قائمة الانتظار هذه:
إنشاء قائمة الانتظار:
<?php
$ queue = $ rk -> newQueue ();
إضافة أقسام الموضوع إلى قائمة الانتظار:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
بعد ذلك، قم باسترداد الرسائل المستهلكة من قائمة الانتظار:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka لكل مخازن الإزاحة الافتراضية على الوسيط.
إذا كنت تستخدم ملفًا محليًا لتخزين الأوفست، فسيتم إنشاء الملف افتراضيًا في الدليل الحالي، باسم يعتمد على الموضوع والقسم. يمكن تغيير الدليل عن طريق تعيين خاصية التكوين offset.store.path
.
للتحكم يدويًا في الإزاحة، قم بتعيين enable.auto.offset.store
على false
.
سيتم التحكم في الإعدادات auto.commit.interval.ms
و auto.commit.enable
ما إذا كان سيتم الالتزام تلقائيًا بالإزاحات المخزنة للوسيط وفي أي فترة زمنية.
للتحكم يدويًا في الإزاحة، قم بتعيين enable.auto.commit
على false
.
الحد الأقصى للوقت المسموح به بين المكالمات لاستهلاك الرسائل للمستهلكين رفيعي المستوى.
إذا تم تجاوز هذه الفترة، يعتبر المستهلك فاشلاً وستفعل المجموعة ذلك
إعادة التوازن من أجل إعادة تعيين الأقسام لعضو آخر في مجموعة المستهلكين.
يعتبر group.id
مسؤولاً عن تعيين معرف مجموعة المستهلكين الخاصة بك ويجب أن يكون فريدًا (ولا يجب أن يتغير). يستخدمه كافكا للتعرف على التطبيقات وتخزين الإزاحات الخاصة بها.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
مرجع تكوين Librdkafka
سيقوم librdkafka بتخزين ما يصل إلى 1 جيجابايت من الرسائل لكل قسم مستهلك افتراضيًا. يمكنك تقليل استخدام الذاكرة عن طريق تقليل قيمة المعلمة queued.max.messages.kbytes
على المستهلكين.
سيقوم كل مثيل للمستهلك والمنتج بإحضار بيانات تعريف المواضيع على فترات زمنية تحددها المعلمة topic.metadata.refresh.interval.ms
. اعتمادًا على إصدار librdkafka لديك، تكون المعلمة الافتراضية هي 10 ثوانٍ، أو 600 ثانية.
تقوم librdkafka بجلب البيانات الوصفية لجميع موضوعات المجموعة بشكل افتراضي. يؤدي تعيين topic.metadata.refresh.sparse
إلى السلسلة "true"
إلى التأكد من أن librdkafka يجلب فقط المواضيع التي يستخدمها.
يمكن أن يؤدي تعيين topic.metadata.refresh.sparse
على "true"
وضبط topic.metadata.refresh.interval.ms
إلى 600 ثانية (بالإضافة إلى بعض الارتعاش) إلى تقليل عرض النطاق الترددي كثيرًا، اعتمادًا على عدد المستهلكين والموضوعات.
يسمح هذا الإعداد بإنهاء سلاسل رسائل librdkafka بمجرد انتهاء librdkafka منها. يسمح هذا بشكل فعال لعمليات/طلبات PHP الخاصة بك بالإنهاء بسرعة.
عند تمكين هذا، يجب عليك إخفاء الإشارة مثل هذا:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
الحد الأقصى للوقت الذي قد يتم حظره بواسطة عملية مأخذ توصيل الوسيط. تعمل القيمة الأقل على تحسين الاستجابة على حساب استخدام وحدة المعالجة المركزية (CPU) بشكل أعلى قليلاً.
يؤدي تقليل قيمة هذا الإعداد إلى تحسين سرعة إيقاف التشغيل. تحدد القيمة الحد الأقصى للوقت الذي سيحظره librdkafka في تكرار واحد لحلقة القراءة. يحدد هذا أيضًا عدد المرات التي سيتحقق فيها مؤشر ترابط librdkafka الرئيسي من الإنهاء.
يحدد هذا الحد الأقصى والوقت الافتراضي الذي سينتظره librdkafka قبل إرسال مجموعة من الرسائل. يؤدي تقليل هذا الإعداد إلى 1 مللي ثانية على سبيل المثال إلى ضمان إرسال الرسائل في أسرع وقت ممكن، بدلاً من تجميعها على دفعات.
وقد لوحظ أن هذا يقلل من وقت إيقاف تشغيل مثيل rdkafka وعملية/طلب PHP.
فيما يلي تكوين مُحسّن لزمن الوصول المنخفض. يتيح ذلك لعملية / طلب PHP إرسال الرسائل في أسرع وقت ممكن وإنهاءها بسرعة.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
يُنصح باستدعاء الاستطلاع على فترات زمنية منتظمة لخدمة عمليات الاسترجاعات. في php-rdkafka:3.x
تم أيضًا استدعاء الاستقصاء أثناء إيقاف التشغيل، لذلك قد لا يتم الاتصال به على فترات منتظمة
يؤدي إلى إيقاف تشغيل أطول قليلاً. المثال أدناه يقوم بالاستقصاء حتى لا يكون هناك المزيد من الأحداث في قائمة الانتظار:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
يمكن العثور على مصدر الوثائق هنا
إذا لم تكن الوثائق كافية، فلا تتردد في طرح الأسئلة على قنوات php-rdkafka على Gitter أو مجموعات Google.
نظرًا لأن IDE الخاص بك غير قادر على اكتشاف php-rdkadka api تلقائيًا، يمكنك التفكير في استخدام حزمة خارجية توفر مجموعة من بذرة لفئات php-rdkafka ووظائفها وثوابتها: kwn/php-rdkafka-stubs
اذا اردت المساهمة فلك جزيل الشكر :)
قبل البدء، يرجى إلقاء نظرة على مستند المساهمة لمعرفة كيفية دمج تغييراتك.
التوثيق منسوخ من librdkafka.
المؤلفون: انظر المساهمين.
تم إصدار php-rdkafka بموجب ترخيص MIT.