هذه المكتبة عبارة عن تطبيق PHP نقي لبروتوكول AMQP 0-9-1. تم اختباره ضد Rabbitmq.
تم استخدام المكتبة لأمثلة PHP من RabbitMQ في العمل والدروس الرسمية للأرنب.
يرجى ملاحظة أن هذا المشروع يتم إصداره باستخدام مدونة سلوك المساهم. من خلال المشاركة في هذا المشروع ، فإنك توافق على الالتزام بشروطه.
بفضل Videlalvaro و PostalService14 لإنشاء php-amqplib
.
يتم الآن الحفاظ على الحزمة من قبل Ramūnas Dronga و Luke Bakken والعديد من مهندسي برنامج VMware الذين يعملون على RabbitMQ.
بدءًا من الإصدار 2.0 ، تستخدم هذه المكتبة AMQP 0.9.1
بشكل افتراضي ، وبالتالي يتطلب RabbitMQ 2.0 أو الإصدار الأحدث. عادةً ما لا تتطلب ترقيات الخادم أي تغييرات في رمز التطبيق لأن البروتوكول يتغير بشكل غير منتظم ولكن يرجى إجراء الاختبار الخاص بك قبل الترقية.
نظرًا لأن المكتبة تستخدم AMQP 0.9.1
أضفنا دعمًا لتمديدات RabbitMQ التالية:
تبادل لتبادل الروابط
ناك الأساسي
الناشر يؤكد
إلغاء المستهلك إخطار
يتم دعم الامتدادات التي تعدل الأساليب الحالية مثل alternate exchanges
.
Enqueue/AMQP-Lib هو غلاف متوافق مع AMQP.
Amqproxy هي مكتبة وكيل مع اتصال/تجميع القنوات. هذا يسمح بتقليل التوصيل والانفصال عن القناة عند استخدام PHP-AMQPlib ، مما يؤدي إلى استخدام أقل وحدة المعالجة المركزية من RabbitMQ.
تأكد من تثبيت الملحن ، ثم قم بتشغيل الأمر التالي:
يتطلب الملحن $ php-amqplib/php-amqplib
سيؤدي ذلك إلى جلب المكتبة وتبعياتها داخل مجلد البائع. ثم يمكنك إضافة ما يلي إلى ملفات .php لاستخدام المكتبة
require_once __dir __. '/Pendor/autoload.php' ؛
ثم تحتاج إلى use
الفئات ذات الصلة ، على سبيل المثال:
استخدم phpamqplibconnectionAmqpStreamConnection ؛ استخدم phpamqplibmessageamqpmessage ؛
مع تشغيل RabbitMQ مفتوحتين ، وفي الأول تنفذ الأوامر التالية لبدء المستهلك:
$ CD PHP-AMQPLIB/DEMO $ php amqp_consumer.php
ثم على المحطة الأخرى تفعل:
$ CD PHP-AMQPLIB/DEMO $ php amqp_publisher.php بعض النصوص للنشر
يجب أن ترى الرسالة التي تصل إلى العملية على المحطة الأخرى
ثم لإيقاف المستهلك ، أرسل إليه رسالة quit
:
$ php amqp_publisher.php quit
إذا كنت بحاجة إلى الاستماع إلى المقابس المستخدمة للاتصال بـ RabbitMQ ، فابحث عن المثال في المستهلك غير الحظر.
$ php amqp_consumer_non_blocking.php
يرجى الاطلاع على changelog لمزيد من المعلومات ما الذي تغير مؤخرًا.
http://php-amqplib.github.io/php-amqplib/
لعدم تكرار أنفسنا ، إذا كنت تريد معرفة المزيد عن هذه المكتبة ، فيرجى الرجوع إلى دروس RabbitMQ الرسمية.
amqp_ha_consumer.php
: تجريبي استخدام قوائم الانتظار المتطابقة.
amqp_consumer_exclusive.php
و amqp_publisher_exclusive.php
: تبادل Demos Fanout باستخدام قوائم قوائم حصرية.
amqp_consumer_fanout_{1,2}.php
و amqp_publisher_fanout.php
: تبادل DEMOS Fanout مع قوائم الانتظار المسماة.
amqp_consumer_pcntl_heartbeat.php
: استخدام مرسل نبضات القلب القائم على الإشارة.
basic_get.php
: العروض التوضيحية التي تحصل على رسائل من قوائم الانتظار باستخدام مكالمة GET BASIC GET AMQP.
إذا كان لديك مجموعة من العقد المتعددة التي يمكن للاتصال بها ، فيمكنك بدء اتصال بمجموعة من المضيفين. للقيام بذلك ، يجب عليك استخدام طريقة create_connection
الثابتة.
على سبيل المثال:
$ connection = AMQPStreamConnection :: create_connection ([[ ['host' => host1 ، 'port' => port ، 'user' => user ، 'password' => pass ، 'vhost' => vhost] ['host' => host2 ، 'port' => port ، 'user' => user ، 'password' => pass ، 'vhost' => vhost] ] ، خيارات $) ؛
سيحاول هذا الرمز الاتصال بـ HOST1
أولاً ، والاتصال بـ HOST2
إذا فشل الاتصال الأول. تقوم الطريقة بإرجاع كائن اتصال لأول اتصال ناجح. إذا فشلت جميع الاتصالات ، فسوف يلقي الاستثناء من محاولة الاتصال الأخيرة.
انظر demo/amqp_connect_multiple_hosts.php
لمزيد من الأمثلة.
دعنا نقول أن لديك عملية تنشئ مجموعة من الرسائل التي سيتم نشرها إلى نفس exchange
باستخدام نفس routing_key
وخيارات مثل mandatory
. ثم يمكنك الاستفادة من ميزة مكتبة batch_basic_publish
. يمكنك دفع رسائل مثل هذا:
$ msg = new AMQPMessage ($ msg_body) ؛ $ ch-> batch_basic_publish ($ msg ، $ exchange) ؛ $ msg2 = new amqpmessage ($ msg_body) ؛ $ ch-> batch_basic_pubublish ($ msg2 ، $ Exchange) ؛
ثم أرسل الدفعة مثل هذا:
$ ch-> publish_batch () ؛
دعنا نقول أن برنامجنا يحتاج إلى القراءة من ملف ثم نشر رسالة واحدة لكل سطر. اعتمادًا على حجم الرسالة ، سيتعين عليك تحديد متى من الأفضل إرسال الدفعة. يمكنك إرسالها كل 50 رسالة ، أو كل مائة. هذا متروك لك.
هناك طريقة أخرى لتسريع رسالتك النشر هي عن طريق إعادة استخدام مثيلات رسالة AMQPMessage
. يمكنك إنشاء رسالتك الجديدة مثل هذا:
$ properties = array ('content_type' => 'text/plain' ، 'delivery_mode' => amqpmessage :: delivery_mode_persistrict) ؛ $ msg = new amqpmessage ($ body ، $ properties) ؛ $ ch- تبادل)؛
الآن ، دعنا نقول أنه بينما تريد تغيير هيئة الرسائل للرسائل المستقبلية ، ستحتفظ بنفس الخصائص ، أي أن رسائلك ستظل text/plain
وستظل delivery_mode
AMQPMessage::DELIVERY_MODE_PERSISTENT
. إذا قمت بإنشاء مثيل AMQPMessage
جديد لكل رسالة منشورة ، فيجب إعادة ترميز هذه الخصائص بتنسيق AMQP الثنائي. يمكنك تجنب كل ذلك من خلال إعادة استخدام AMQPMessage
ثم إعادة تعيين جسم الرسالة مثل هذا:
$ msg-> setbody ($ body2) ؛ $ ch-> basic_publish ($ msg ، $ exchange) ؛
لا يفرض AMQP أي حد على حجم الرسائل ؛ إذا تم استلام رسالة كبيرة جدًا من قبل المستهلك ، فقد يتم الوصول إلى حد ذاكرة PHP داخل المكتبة قبل نقل رد الاتصال إلى basic_consume
.
لتجنب ذلك ، يمكنك استدعاء الطريقة AMQPChannel::setBodySizeLimit(int $bytes)
على مثيل القنوات الخاص بك. سيتم اقتطاع أحجام الجسم التي تتجاوز هذا الحد ، وتسليمها إلى رد الاتصال الخاص بك باستخدام علامة AMQPMessage::$is_truncated
true
. ستعكس الخاصية AMQPMessage::$body_size
حجم الجسم الحقيقي للرسالة المستلمة ، والتي ستكون أعلى من strlen(AMQPMessage::getBody())
إذا تم اقتطاع الرسالة.
لاحظ أن جميع البيانات أعلاه تتم قراءة الحد من قناة AMQP وتجاهلها على الفور ، لذلك لا توجد طريقة لاستردادها ضمن رد الاتصال الخاص بك. إذا كان لديك مستهلك آخر يمكنه التعامل مع الرسائل ذات الأحمال الكبيرة ، فيمكنك استخدام basic_reject
أو basic_nack
لإخبار الخادم (الذي لا يزال لديه نسخة كاملة) لإعادة توجيهه إلى تبادل الحروف الميتة.
بشكل افتراضي ، لن يحدث أي اقتطاع. لتعطيل الاقتطاع على قناة جعلتها ممكّنة ، تمرير 0
(أو null
) إلى AMQPChannel::setBodySizeLimit()
.
بعض عملاء RABBITMQ يستخدمون آليات استرداد الاتصال الآلي لإعادة توصيل القنوات والمستهلكين واستعادةها في حالة وجود أخطاء في الشبكة.
نظرًا لأن هذا العميل يستخدم سلسلة واحدة ، يمكنك إعداد استرداد الاتصال باستخدام آلية معالجة الاستثناءات.
الاستثناءات التي قد يتم إلقاؤها في حالة أخطاء الاتصال:
phpamqplibexceptionamqpconnectionClosexceptionPamqpleBexceptionAmqpioExceptionRuntimeExceptionErrorexception
قد يتم إلقاء بعض الاستثناءات الأخرى ، ولكن لا يزال هناك اتصال. من الجيد دائمًا تنظيف اتصال قديم عند التعامل مع استثناء قبل إعادة الاتصال.
على سبيل المثال ، إذا كنت ترغب في إعداد اتصال استرداد:
$ connection = null ؛ $ channel = null ؛ بينما (true) {try {$ connection = new AmqPstreamConnection (المضيف ، المنفذ ، المستخدم ، الممر ، vhost) ؛ // يذهب رمز التطبيق الخاص بك هنا. } catch (amqpruntimeexception $ e) {echo $ e-> getMessage () ؛ cleanup_connection ($ connection) ؛ uSleep (wait_before_reconnect_us) ؛ } catch (runTimeException $ e) {cleanup_connection ($ connection) ؛ uSleep (wait_before_reconnect_us) ؛ } catch (errorexception $ e) {cleanup_connection ($ connection) ؛ usleep (wait_before_reconnect_us) ؛ } }
مثال كامل هو في demo/connection_recovery_consume.php
.
سيقوم هذا الرمز بإعادة توصيل رمز التطبيق وإعادة إعادةه فيه في كل مرة يحدث فيها الاستثناء. لا يزال من الممكن إلقاء بعض الاستثناءات ولا ينبغي التعامل معها كجزء من عملية إعادة الاتصال ، لأنها قد تكون أخطاء في التطبيق.
هذا النهج منطقي في الغالب لتطبيقات المستهلكين ، سيتطلب المنتجون بعض رمز التطبيق الإضافي لتجنب نشر نفس الرسالة عدة مرات.
كان هذا مثالًا أبسط ، في تطبيق حقيقي قد ترغب في التحكم في عدد Retr Count وربما يتحلل وقت الانتظار بأمان لإعادة الاتصال.
يمكنك العثور على مثال أكثر مفرطة في #444
إذا كنت قد قمت بتثبيت تمديد تمديد PCNTL ، فسيتم التعامل مع الإشارة عندما لا يقوم المستهلك بمعالجة رسالة.
$ pcntlhandler = function ($ signal) {switch ($ signal) {case sigterm: case sigusr1: case sigint: // بعض الأشياء قبل إيقاف قفل المستهلك eg etcpcntl_signal (إشارة $ ، sig_dfl) ؛ // استعادة HandlerPosix_Kill (posix_getpid () ، إشارة $) ؛ // اقتل الذات مع الإشارة ، راجع https://www.cons.org/cracauer/sigint.htmlcase sinup: // بعض الأشياء لإعادة تشغيل المستهلك ؛ الافتراضي: // لا شيء} } ؛ pcntl_signal (sigterm ، $ pcntlhandler) ؛ pcntl_signal (sigint ، $ pcntlhandler) ؛ pcntl_signal (sigusr1 ، $ pcntlhandler) ؛ pcntl_signal (تنهد ، $ pcntlhandler) ؛
لتعطيل هذه الميزة ، فقط حدد AMQP_WITHOUT_SIGNALS
كما true
<؟ phpdefine ('amqp_without_signals' ، true) ؛ ... المزيد من الكود
إذا قمت بتثبيت امتداد PCNTL واستخدمت PHP 7.1 أو أكثر ، فيمكنك تسجيل مرسل نبضات القلب المستند إلى الإشارة.
<؟ php $ sender = new pcntlheartbeatsender ($ connection) ؛ $ sender-> register () ؛ ... code $ sender-> unregister () ؛
إذا كنت تريد معرفة ما يجري على مستوى البروتوكول ، فأضف الثابت التالي إلى الكود الخاص بك:
<؟ phpdefine ('amqp_debug' ، true) ؛ ... المزيد من الكود؟>
لتشغيل النشر/الاستهلاك نوع القياس:
$ جعل معيار
يرجى الاطلاع على المساهمة للحصول على التفاصيل.
إذا كنت لا تزال ترغب في استخدام الإصدار القديم من البروتوكول ، فيمكنك القيام بذلك عن طريق تعيين الثابت التالي في رمز التكوين الخاص بك:
DEFINE ('AMQP_PROTOCOL' ، '0.8') ؛
القيمة الافتراضية هي '0.9.1'
.
إذا كنت لا ترغب في استخدام الملحن لسبب ما ، فأنت بحاجة إلى وجود adoloader في مكانه في فصول المكتبة. وقد أبلغ الناس استخدام هذا التحميل التلقائي مع النجاح.
فيما يلي محتوى ملف ReadMe الأصلي. اعتمادات تذهب إلى المؤلفين الأصليين.
PHP Library تنفيذ بروتوكول قائمة انتظار الرسائل المتقدمة (AMQP).
المكتبة هي منفذ Python Code of Py-Amqplib http://barryp.org/software/py-amqplib/
تم اختباره مع خادم RabbitMQ.
الصفحة الرئيسية للمشروع: http://code.google.com/p/php-amqplib/
للمناقشة ، يرجى الانضمام إلى المجموعة:
http://groups.google.com/group/php-amqplib-devel
لتقارير الأخطاء ، يرجى استخدام نظام تتبع الأخطاء في صفحة المشروع.
بقع مرحب به للغاية!
المؤلف: Vadim Zaliva [email protected]