تتضمن RabbitMqBundle
المراسلة في تطبيقك عبر RabbitMQ باستخدام مكتبة php-amqplib.
تطبق الحزمة العديد من أنماط المراسلة كما هو موضح في مكتبة Thumper. ولذلك فإن نشر الرسائل إلى RabbitMQ من وحدة تحكم Symfony يعد أمرًا سهلاً كما يلي:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
لاحقًا، عندما تريد استهلاك 50 رسالة من قائمة انتظار upload_pictures
، ما عليك سوى التشغيل على واجهة سطر الأوامر:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
تتوقع جميع الأمثلة وجود خادم RabbitMQ قيد التشغيل.
تم تقديم هذه الحزمة في مؤتمر Symfony Live Paris 2011. شاهد الشرائح هنا.
بسبب التغييرات الطارئة التي حدثت بسبب Symfony >=4.4، تم إصدار علامة جديدة، مما يجعل الحزمة متوافقة مع Symfony >=4.4.
تتطلب الحزمة وتبعياتها مع الملحن:
$ composer require php-amqplib/rabbitmq-bundle
تسجيل الحزمة:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
يتمتع !
إذا كان لديك تطبيق وحدة تحكم يستخدم لتشغيل مستهلكي RabbitMQ، فلن تحتاج إلى Symfony HttpKernel و FrameworkBundle. من الإصدار 1.6، يمكنك استخدام مكون حقن التبعية لتحميل تكوين الحزمة والخدمات هذه، ثم استخدام أمر المستهلك.
اطلب الحزمة الموجودة في ملف Composer.json الخاص بك:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
قم بتسجيل الامتداد وتمرير المترجم:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
منذ 04-06-2012، تغيرت بعض الخيارات الافتراضية للتبادلات المعلنة في قسم تكوين "المنتجين" لتتوافق مع الإعدادات الافتراضية للتبادلات المعلنة في قسم "المستهلكين". الإعدادات المتأثرة هي:
durable
من false
إلى true
،auto_delete
من true
إلى false
.يجب تحديث التكوين الخاص بك إذا كنت تعتمد على القيم الافتراضية السابقة.
منذ 24-04-2012، تغير توقيع أسلوب ConsumerInterface::execute
منذ 03/01/2012، تحصل طريقة تنفيذ المستهلكين على كائن رسالة AMQP بالكامل وليس النص فقط. راجع ملف CHANGELOG لمزيد من التفاصيل.
أضف قسم old_sound_rabbit_mq
في ملف التكوين الخاص بك:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
نقوم هنا بتكوين خدمة الاتصال ونقاط نهاية الرسالة التي سيحتوي عليها تطبيقنا. في هذا المثال، ستحتوي حاوية الخدمة الخاصة بك على الخدمة old_sound_rabbit_mq.upload_picture_producer
و old_sound_rabbit_mq.upload_picture_consumer
. ويتوقع الأحدث أن هناك خدمة تسمى upload_picture_service
.
إذا لم تحدد اتصالاً للعميل، فسيبحث العميل عن اتصال بنفس الاسم المستعار. لذا، بالنسبة إلى upload_picture
ستبحث حاوية الخدمة عن اتصال upload_picture
.
إذا كنت بحاجة إلى إضافة وسيطات قائمة انتظار اختيارية، فيمكن أن تكون خيارات قائمة الانتظار الخاصة بك شيئًا من هذا القبيل:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
مثال آخر برسالة TTL مدتها 20 ثانية:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
يجب أن تكون قيمة الوسيطة قائمة بنوع البيانات والقيمة. أنواع البيانات الصالحة هي:
S
- سلسلةI
- عدد صحيحD
- عشريT
- الطوابع الزمنيةF
- الجدولA
المصفوفةt
- بول قم بتكييف arguments
وفقًا لاحتياجاتك.
إذا كنت تريد ربط قائمة الانتظار بمفاتيح توجيه محددة، فيمكنك الإعلان عنها في إعدادات المنتج أو المستهلك:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
في بيئة Symfony، يتم تمهيد جميع الخدمات بالكامل لكل طلب، بدءًا من الإصدار >= 4.3، يمكنك إعلان الخدمة كخدمة كسولة (Lazy Services). لا تزال هذه الحزمة لا تدعم ميزة Lazy Services الجديدة ولكن يمكنك تعيين lazy: true
في تكوين الاتصال الخاص بك لتجنب الاتصالات غير الضرورية مع وسيط الرسائل الخاص بك في كل طلب. يوصى بشدة باستخدام الاتصالات البطيئة لأسباب تتعلق بالأداء، ومع ذلك يتم تعطيل الخيار البطيء افتراضيًا لتجنب الانقطاعات المحتملة في التطبيقات التي تستخدم هذه الحزمة بالفعل.
إنها فكرة جيدة أن تقوم بضبط read_write_timeout
على ضعف نبضات القلب بحيث يكون المقبس مفتوحًا. إذا لم تقم بذلك، أو استخدمت مضاعفًا مختلفًا، فهناك خطر انتهاء مهلة مأخذ توصيل المستهلك .
يرجى أن تضع في اعتبارك أنه يمكنك توقع حدوث مشكلات، إذا كانت مهامك بشكل عام تعمل لفترة أطول من فترة نبضات القلب، والتي لا توجد حلول جيدة لها (الرابط). فكر في استخدام قيمة كبيرة لنبضات القلب أو ترك نبضات القلب معطلة لصالح keepalive
برنامج التعاون الفني (سواء على جانب العميل أو الخادم) وميزة graceful_max_execution_timeout
.
يمكنك توفير مضيفين متعددين للاتصال. سيسمح لك هذا باستخدام مجموعة RabbitMQ مع عقد متعددة.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
انتبه إلى أنه لا يمكنك تحديده
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
المعلمات لكل مضيف على حدة.
في بعض الأحيان قد تحتاج معلومات الاتصال الخاصة بك إلى أن تكون ديناميكية. تتيح لك معلمات الاتصال الديناميكي توفير المعلمات أو تجاوزها برمجيًا من خلال الخدمة.
على سبيل المثال، في السيناريو الذي تعتمد فيه معلمة vhost
الخاصة بالاتصال على المستأجر الحالي لتطبيقك ذو العلامة البيضاء ولا تريد (أو لا تستطيع) تغيير تكوينه في كل مرة.
حدد خدمة ضمن connection_parameters_provider
التي تنفذ ConnectionParametersProviderInterface
، وأضفها إلى تكوين connections
المناسب.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
مثال التنفيذ:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
في هذه الحالة، سيتم تجاوز المعلمة vhost
بمخرجات getVhost()
.
في تطبيق المراسلة، تسمى عملية إرسال الرسائل إلى الوسيط بالمنتج بينما تسمى عملية تلقي هذه الرسائل بالمستهلك . في تطبيقك، سيكون لديك العديد منها والتي يمكنك إدراجها ضمن الإدخالات الخاصة بها في التكوين.
سيتم استخدام المنتج لإرسال الرسائل إلى الخادم. في نموذج AMQP، يتم إرسال الرسائل إلى البورصة ، وهذا يعني أنه في تكوين المنتج، سيتعين عليك تحديد خيارات الاتصال جنبًا إلى جنب مع خيارات التبادل، والتي عادة ما تكون اسم البورصة ونوعها.
لنفترض الآن أنك تريد معالجة تحميلات الصور في الخلفية. بعد نقل الصورة إلى موقعها النهائي، سوف تقوم بنشر رسالة إلى الخادم بالمعلومات التالية:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
كما ترون، إذا كان لديك في التكوين الخاص بك منتج يسمى upload_picture ، فستكون لديك في حاوية الخدمة خدمة تسمى old_sound_rabbit_mq.upload_picture_producer .
إلى جانب الرسالة نفسها، يقبل الأسلوب OldSoundRabbitMqBundleRabbitMqProducer#publish()
أيضًا معلمة مفتاح توجيه اختيارية ومجموعة اختيارية من الخصائص الإضافية. تسمح لك مجموعة الخصائص الإضافية بتغيير الخصائص التي يتم من خلالها إنشاء كائن PhpAmqpLibMessageAMQPMessage
افتراضيًا. بهذه الطريقة، على سبيل المثال، يمكنك تغيير رؤوس التطبيقات.
يمكنك استخدام أساليب setContentType و setDeliveryMode لتعيين نوع محتوى الرسالة ووضع تسليم الرسالة على التوالي، وتجاوز أي مجموعة افتراضية في قسم تكوين "المنتجين". إذا لم يتم تجاوزها بواسطة تكوين "المنتجين" أو استدعاء صريح لهذه الأساليب (وفقًا للمثال أدناه)، فإن القيم الافتراضية هي نص/عادي لنوع المحتوى و 2 لوضع التسليم.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
إذا كنت بحاجة إلى استخدام فئة مخصصة للمنتج (والتي يجب أن ترث من OldSoundRabbitMqBundleRabbitMqProducer
)، فيمكنك استخدام خيار class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
الجزء التالي من اللغز هو أن يكون لديك مستهلك يقوم بإخراج الرسالة من قائمة الانتظار ومعالجتها وفقًا لذلك.
يوجد حاليًا حدثان منبعثان من المنتج.
يقع هذا الحدث مباشرة قبل نشر الرسالة. يعد هذا رابطًا جيدًا لإجراء أي تسجيل نهائي أو التحقق من الصحة وما إلى ذلك قبل إرسال الرسالة فعليًا. نموذج تنفيذ للمستمع:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
يقع هذا الحدث مباشرة بعد نشر الرسالة. يعد هذا رابطًا جيدًا للقيام بأي تسجيل تأكيد أو التزامات أو ما إلى ذلك بعد إرسال الرسالة فعليًا. نموذج تنفيذ للمستمع:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
سيتصل المستهلك بالخادم ويبدأ حلقة في انتظار معالجة الرسائل الواردة. اعتمادًا على رد الاتصال المحدد لهذا المستهلك، سيكون السلوك الذي سيتبعه. دعونا نراجع تكوين المستهلك من الأعلى:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
كما نرى هناك، فإن خيار رد الاتصال يحتوي على إشارة إلى upload_picture_service . عندما يتلقى المستهلك رسالة من الخادم، فإنه سيقوم بتنفيذ رد الاتصال هذا. إذا كنت بحاجة إلى تحديد رد اتصال مختلف لأغراض الاختبار أو تصحيح الأخطاء، فيمكنك تغييره هناك.
وبصرف النظر عن رد الاتصال، فإننا نحدد أيضًا الاتصال الذي سيتم استخدامه، بنفس الطريقة التي نتبعها مع المنتج . الخيارات المتبقية هي Exchange_options و queue_options . يجب أن تكون خيارات التبادل هي نفس تلك المستخدمة للمنتج . في queue_options سوف نقدم اسم قائمة الانتظار . لماذا؟
كما قلنا، يتم نشر الرسائل الموجودة في AMQP في البورصة . هذا لا يعني أن الرسالة وصلت إلى قائمة الانتظار . لكي يحدث هذا، نحتاج أولاً إلى إنشاء قائمة الانتظار هذه ثم ربطها بالتبادل . والشيء الرائع في هذا هو أنه يمكنك ربط عدة قوائم انتظار بتبادل واحد، وبهذه الطريقة يمكن أن تصل رسالة واحدة إلى عدة وجهات. وميزة هذا النهج هو الانفصال عن المنتج والمستهلك. لا يهتم المنتج بعدد المستهلكين الذين سيعالجون رسائله. كل ما يحتاجه هو أن تصل رسالته إلى الخادم. وبهذه الطريقة يمكننا توسيع الإجراءات التي نقوم بها في كل مرة يتم فيها تحميل صورة دون الحاجة إلى تغيير التعليمات البرمجية في وحدة التحكم الخاصة بنا.
الآن، كيفية إدارة المستهلك؟ هناك أمر يمكن تنفيذه على النحو التالي:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
ماذا يعني هذا؟ نقوم بتنفيذ أمر upload_picture للمستهلك بأن يستهلك 50 رسالة فقط. في كل مرة يتلقى المستهلك رسالة من الخادم، فإنه سيتم تنفيذ رد الاتصال المكوّن بتمرير رسالة AMQP كمثيل للفئة PhpAmqpLibMessageAMQPMessage
. يمكن الحصول على نص الرسالة عن طريق استدعاء $msg->body
. افتراضيًا، سيقوم المستهلك بمعالجة الرسائل في حلقة لا نهاية لها للحصول على تعريف ما لا نهاية له .
إذا كنت تريد التأكد من أن العميل سينتهي من التنفيذ فورًا على إشارة Unix، فيمكنك تشغيل الأمر باستخدام العلامة -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
ثم سينتهي المستهلك من التنفيذ على الفور.
لاستخدام الأمر مع هذه العلامة، تحتاج إلى تثبيت PHP بامتداد PCNTL.
إذا كنت تريد تحديد حد لذاكرة المستهلك، فيمكنك القيام بذلك باستخدام العلم -l
. في المثال التالي، تضيف هذه العلامة حدًا للذاكرة يبلغ 256 ميجابايت. سيتم إيقاف المستهلك بمقدار خمسة ميغابايت قبل أن يصل إلى 256 ميغابايت لتجنب حدوث خطأ في حجم الذاكرة المسموح بها في PHP.
$ ./app/console rabbitmq:consumer -l 256
إذا كنت تريد إزالة كافة الرسائل المنتظرة في قائمة الانتظار، فيمكنك تنفيذ هذا الأمر لتطهير قائمة الانتظار هذه:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
لحذف قائمة انتظار المستهلك، استخدم هذا الأمر:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
يمكن أن يكون هذا مفيدًا في العديد من السيناريوهات. هناك 3 أحداث AMQPE:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` والتحقق من نشر التطبيق الجديد.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
تم رفع الحدث قبل معالجة AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
تم رفع الحدث بعد معالجة AMQPMessage
. إذا كانت رسالة العملية ستطرح استثناءً، فلن يتم رفع الحدث.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
تم رفع الحدث عند إنهاء طريقة wait
بعد انتهاء المهلة دون تلقي رسالة. للاستفادة من هذا الحدث، يجب تكوين idle_timeout
للمستهلك. بشكل افتراضي، يمكنك منع عملية الخروج عند انتهاء مهلة الخمول عن طريق تعيين $event->setForceStop(false)
في المستمع.
إذا كنت بحاجة إلى تعيين مهلة عندما لا تكون هناك رسائل من قائمة الانتظار الخاصة بك خلال فترة من الوقت، فيمكنك تعيين idle_timeout
بالثواني. يحدد idle_timeout_exit_code
رمز الخروج الذي يجب على المستهلك إرجاعه عند انتهاء مهلة الخمول. بدون تحديد ذلك، سيقوم المستهلك بطرح استثناء PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
اضبط timeout_wait
بالثواني. يحدد timeout_wait
المدة التي سينتظرها المستهلك دون تلقي رسالة جديدة قبل التأكد من أن الاتصال الحالي لا يزال صالحًا.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
إذا كنت تريد تشغيل المستهلك الخاص بك لفترة زمنية معينة ثم الخروج بأمان، فقم بتعيين graceful_max_execution.timeout
بالثواني. "الخروج بأمان" يعني أن المستهلك سيخرج إما بعد المهمة الجاري تشغيلها حاليًا أو فورًا، عند انتظار مهام جديدة. يحدد graceful_max_execution.exit_code
رمز الخروج الذي يجب على المستهلك إرجاعه عند انتهاء مهلة التنفيذ الأقصى. بدون تحديد ذلك، سيخرج المستهلك بالحالة 0
.
تعتبر هذه الميزة رائعة بالاقتران مع المشرف، والتي يمكن أن تسمح معًا بتنظيف تسرب الذاكرة بشكل دوري، والاتصال بقاعدة البيانات/تجديد rabbitmq والمزيد.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
ربما لاحظت أن الإرسال لا يزال لا يعمل تمامًا كما نريد. على سبيل المثال، في حالة وجود عاملين، عندما تكون جميع الرسائل الفردية ثقيلة وحتى الرسائل خفيفة، سيكون أحد العاملين مشغولاً باستمرار ولن يقوم الآخر بأي عمل تقريبًا. حسنًا، لا يعرف RabbitMQ أي شيء عن ذلك وسيظل يرسل الرسائل بالتساوي.
يحدث هذا لأن RabbitMQ يرسل رسالة فقط عندما تدخل الرسالة إلى قائمة الانتظار. ولا ينظر إلى عدد الرسائل غير المقبولة للمستهلك. إنه يرسل بشكل أعمى كل رسالة n إلى المستهلك n.
للتغلب على ذلك يمكننا استخدام طريقة basic.qos مع إعداد prefetch_count=1. هذا يخبر RabbitMQ بعدم إعطاء أكثر من رسالة واحدة للعامل في المرة الواحدة. أو بمعنى آخر، لا ترسل رسالة جديدة إلى العامل حتى يقوم بمعالجة الرسالة السابقة والإقرار بها. وبدلاً من ذلك، سيتم إرساله إلى العامل التالي الذي لا يزال غير مشغول.
من: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
كن حذرًا لأن تنفيذ التوزيع العادل يؤدي إلى زمن استجابة من شأنه الإضرار بالأداء (راجع منشور المدونة هذا). لكن تنفيذه يسمح لك بالتوسع أفقيًا ديناميكيًا مع زيادة قائمة الانتظار. يجب عليك تقييم القيمة الصحيحة لـ prefetch_size، كما توصي المدونة، وفقًا للوقت المستغرق لمعالجة كل رسالة وأداء شبكتك.
باستخدام RabbitMqBundle، يمكنك تكوين خيارات qos_options لكل مستهلك على النحو التالي:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
إذا تم استخدامه مع Symfony 4.2+ ، تعلن الحزمة في مجموعة الحاويات من الأسماء المستعارة للمنتجين والمستهلكين العاديين. يتم استخدامها للوسائط التلقائية بناءً على النوع المعلن واسم الوسيطة. يتيح لك هذا تغيير مثال المنتج السابق إلى:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
يتم إنشاء اسم الوسيطة من اسم المنتج أو المستهلك من التكوين وملحق بكلمة المنتج أو المستهلك وفقًا للنوع. على عكس اصطلاح تسمية عناصر الحاوية، لن يتم تكرار لاحقة الكلمة (المنتج أو المستهلك) إذا كان الاسم مُلحقًا بالفعل. سيتم تغيير مفتاح منتج upload_picture
إلى اسم الوسيطة $uploadPictureProducer
. سيتم أيضًا تسمية مفتاح المنتج upload_picture_producer
باسم مستعار لاسم الوسيطة $uploadPictureProducer
. من الأفضل تجنب الأسماء المتشابهة بهذه الطريقة.
جميع المنتجين مستعارون لـ OldSoundRabbitMqBundleRabbitMqProducerInterface
وخيار فئة المنتج من التكوين. في وضع الحماية، يتم إنشاء الأسماء المستعارة ProducerInterface
فقط. يوصى بشدة باستخدام فئة ProducerInterface
عند كتابة وسائط تلميح لحقن المنتج.
جميع المستهلكين مستعارون لقيمة خيار التكوين OldSoundRabbitMqBundleRabbitMqConsumerInterface
و %old_sound_rabbit_mq.consumer.class%
. لا يوجد فرق بين الوضع العادي ووضع الحماية. يوصى بشدة باستخدام ConsumerInterface
عند كتابة وسيطات التلميح لإدخال العميل.
فيما يلي مثال على رد الاتصال:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
كما ترون، هذا أمر بسيط مثل تنفيذ طريقة واحدة: ConsumerInterface::execute .
ضع في اعتبارك أن عمليات الاسترجاعات الخاصة بك يجب أن يتم تسجيلها كخدمات Symfony عادية. هناك يمكنك إدخال حاوية الخدمة، وخدمة قاعدة البيانات، ومسجل Symfony، وما إلى ذلك.
راجع https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md لمزيد من التفاصيل حول ما هو جزء من مثيل الرسالة.
لإيقاف المستهلك، يمكن لرد الاتصال طرح StopConsumerException
( لن تكون الرسالة الأخيرة المستهلكة ack) أو AckStopConsumerException
( ستكون الرسالة ack). في حالة استخدام شيطان، على سبيل المثال: المشرف، سيتم إعادة تشغيل المستهلك فعليًا.
يبدو أن هذا يتطلب الكثير من العمل لإرسال الرسائل فقط، فلنلخص الأمر للحصول على نظرة عامة أفضل. هذا ما نحتاجه لإنتاج/استهلاك الرسائل:
وهذا كل شيء!
كان هذا متطلبًا لإمكانية تتبع الرسائل المستلمة/المنشورة. لتمكين هذا، ستحتاج إلى إضافة تكوين enable_logger
إلى المستهلكين أو الناشرين.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
إذا كنت ترغب في ذلك، يمكنك أيضًا التعامل مع التسجيل من قوائم الانتظار باستخدام معالجات مختلفة في المونولوج، من خلال الرجوع إلى القناة phpamqplib
.
لقد قمنا حتى الآن بإرسال رسائل إلى المستهلكين، ولكن ماذا لو أردنا الحصول على رد منهم؟ ولتحقيق ذلك علينا تنفيذ مكالمات RPC في تطبيقنا. تجعل هذه الحزمة من السهل جدًا تحقيق مثل هذه الأشياء باستخدام Symfony.
دعونا نضيف عميل وخادم RPC إلى التكوين:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
للحصول على مرجع التكوين الكامل، يرجى استخدام الأمر php app/console config:dump-reference old_sound_rabbit_mq
.
لدينا هنا خادم مفيد جدًا: فهو يُرجع أعدادًا صحيحة عشوائية لعملائه. سيكون رد الاتصال المستخدم لمعالجة الطلب هو خدمة Random_int_server . الآن دعونا نرى كيفية استدعائها من وحدات التحكم لدينا.
علينا أولاً تشغيل الخادم من سطر الأوامر:
$ ./app/console_dev rabbitmq:rpc-server random_int
ثم أضف الكود التالي إلى وحدة التحكم الخاصة بنا:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
كما ترون هناك، إذا كان معرف العميل الخاص بنا هو integer_store ، فسيكون اسم الخدمة old_sound_rabbit_mq.integer_store_rpc . بمجرد الحصول على هذا الكائن، نضع طلبًا على الخادم عن طريق استدعاء addRequest
الذي يتوقع ثلاث معلمات:
الوسيطات التي نرسلها هي القيم الدنيا والقصوى للدالة rand()
. نرسلهم عن طريق إجراء تسلسل لمصفوفة. إذا كان خادمنا يتوقع معلومات JSON، أو XML، فسنرسل هذه البيانات هنا.
القطعة الأخيرة هي الحصول على الرد. سيتم حظر برنامج PHP النصي الخاص بنا حتى يُرجع الخادم قيمة. سيكون المتغير $replies عبارة عن مصفوفة ترابطية حيث سيتم تضمين كل رد من الخادم في مفتاح request_id المعني.
افتراضيًا، يتوقع عميل RPC إجراء تسلسل للاستجابة. إذا قام الخادم الذي تعمل معه بإرجاع نتيجة غير متسلسلة، فقم بتعيين خيار expect_serialized_response
لعميل RPC على خطأ. على سبيل المثال، إذا لم يقم خادم integer_store بإجراء تسلسل للنتيجة، فسيتم تعيين العميل على النحو التالي:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
يمكنك أيضًا تعيين انتهاء صلاحية للطلب بالمللي ثانية، وبعد ذلك لن تتم معالجة الرسالة بواسطة الخادم وستنتهي مهلة طلب العميل ببساطة. يعمل إعداد انتهاء صلاحية الرسائل فقط مع الإصدار 3.x من RabbitMQ والإصدارات الأحدث. تفضل بزيارة http://www.rabbitmq.com/ttl.html#per-message-ttl لمزيد من المعلومات.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
كما يمكنك التخمين، يمكننا أيضًا إجراء مكالمات RPC متوازية .
لنفترض أنه لعرض بعض صفحات الويب، تحتاج إلى إجراء استعلامين لقاعدة البيانات، أحدهما يستغرق 5 ثوانٍ لإكماله والآخر يستغرق ثانيتين - استعلامات باهظة الثمن جدًا. إذا قمت بتنفيذها بشكل تسلسلي، فستكون صفحتك جاهزة للتسليم في حوالي 7 ثوانٍ. إذا قمت بتشغيلهما بالتوازي، فسيتم عرض صفحتك في حوالي 5 ثوانٍ. مع RabbitMqBundle
يمكننا إجراء مثل هذه المكالمات المتوازية بسهولة. دعونا نحدد عميلًا موازيًا في التكوين وخادم RPC آخر:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
ثم يجب أن يذهب هذا الرمز إلى وحدة التحكم الخاصة بنا:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
يشبه إلى حد كبير المثال السابق، لدينا فقط استدعاء addRequest
إضافي. كما نقدم أيضًا معرفات طلب ذات معنى، لذا سيكون من الأسهل بالنسبة لنا العثور على الرد الذي نريده في مصفوفة $replies لاحقًا.
لتمكين الرد المباشر على العملاء، عليك فقط تمكين الخيار direct_reply_to في تكوين rpc_clients للعميل.
سيستخدم هذا الخيار قائمة الانتظار الزائفة amq.rabbitmq.reply-to عند إجراء مكالمات RPC. على خادم RPC ليست هناك حاجة لأي تعديل.
لدى RabbitMQ تنفيذ قائمة الانتظار ذات الأولوية في المركز اعتبارًا من الإصدار 3.5.0. يمكن تحويل أي قائمة انتظار إلى قائمة انتظار ذات أولوية باستخدام الوسائط الاختيارية المقدمة من العميل (ولكن، على عكس الميزات الأخرى التي تستخدم الوسائط الاختيارية، وليس السياسات). يدعم التنفيذ عددًا محدودًا من الأولويات: 255. ويوصى باستخدام القيم بين 1 و10. تحقق من الوثائق
إليك كيفية إعلان قائمة انتظار الأولوية
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
في حالة وجود قائمة انتظار upload-picture
قبل ذلك، يجب عليك حذف قائمة الانتظار هذه قبل تشغيل أمر rabbitmq:setup-fabric
لنفترض الآن أنك تريد إنشاء رسالة ذات أولوية عالية، فيجب عليك نشر الرسالة مع هذه المعلومات الإضافية
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
إنها ممارسة جيدة أن يكون لديك الكثير من قوائم الانتظار للفصل المنطقي. مع المستهلك البسيط، سيتعين عليك إنشاء عامل (مستهلك) واحد لكل قائمة انتظار وقد يكون من الصعب إدارته عند التعامل مع العديد من التطورات (هل نسيت إضافة سطر في التكوين الإشرافي الخاص بك؟). يعد هذا مفيدًا أيضًا لقوائم الانتظار الصغيرة حيث قد لا ترغب في أن يكون لديك عدد كبير من العاملين مثل قوائم الانتظار، وترغب في إعادة تجميع بعض المهام معًا دون فقدان المرونة ومبدأ الفصل.
يتيح لك العديد من المستهلكين التعامل مع حالة الاستخدام هذه من خلال الاستماع إلى قوائم انتظار متعددة على نفس المستهلك.
إليك كيفية تعيين المستهلك بقوائم انتظار متعددة:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
يتم الآن تحديد رد الاتصال ضمن كل قائمة انتظار ويجب تنفيذ واجهة ConsumerInterface
مثل المستهلك البسيط. جميع خيارات queues-options
في المستهلك متاحة لكل قائمة انتظار.
انتبه إلى أن جميع قوائم الانتظار موجودة ضمن نفس التبادل، والأمر متروك لك لتعيين التوجيه الصحيح لعمليات الاسترجاعات.
إن queues_provider
هي خدمة اختيارية توفر قوائم الانتظار ديناميكيًا. يجب عليه تنفيذ QueuesProviderInterface
.
انتبه إلى أن موفري قوائم الانتظار مسؤولون عن الاستدعاءات المناسبة لـ setDequeuer
وأن عمليات الاسترجاعات قابلة للاستدعاء (وليست ConsumerInterface
). في حالة تنفيذ خدمة توفير قوائم الانتظار DequeuerAwareInterface
، تتم إضافة استدعاء setDequeuer
إلى تعريف الخدمة مع كون DequeuerInterface
حاليًا MultipleConsumer
.
قد تجد أن تطبيقك يحتوي على سير عمل معقد وتحتاج إلى ربط تعسفي. قد تتضمن سيناريوهات الربط التعسفي تبادلًا لتبادل الارتباطات عبر خاصية destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
سيعلن الأمر Rabbitmq:setup-fabric عن عمليات التبادل وقوائم الانتظار كما هو محدد في تكوينات المنتج والمستهلك والمستهلك المتعدد قبل إنشاء الارتباطات التعسفية الخاصة بك. ومع ذلك، لن يعلن Rabbitmq:setup-fabric عن قوائم انتظار الإضافة والتبادلات المحددة في الارتباطات. الأمر متروك لك للتأكد من الإعلان عن التبادلات/قوائم الانتظار.
في بعض الأحيان يتعين عليك تغيير تكوين المستهلك بسرعة. يتيح لك المستهلكون الديناميكيون تحديد خيارات قائمة انتظار المستهلكين برمجيًا، استنادًا إلى السياق.
على سبيل المثال، في السيناريو الذي يجب أن يكون فيه المستهلك المحدد مسؤولاً عن عدد ديناميكي من المواضيع ولا تريد (أو لا تستطيع) تغيير تكوينه في كل مرة.
حدد خدمة queue_options_provider
التي تنفذ QueueOptionsProviderInterface
، وأضفها إلى تكوين dynamic_consumers
الخاص بك.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
مثال للاستخدام:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
في هذه الحالة، يعمل عميل proc_logs
على server1
ويمكنه تحديد خيارات قائمة الانتظار التي يستخدمها.
الآن، لماذا سنحتاج إلى مستهلكين مجهولين؟ يبدو هذا وكأنه تهديد عبر الإنترنت أو شيء من هذا القبيل... استمر في القراءة.
يوجد في AMQP نوع من التبادل يُسمى "الموضوع" حيث يتم توجيه الرسائل إلى قوائم الانتظار بناءً على -كما تخمن- موضوع الرسالة. يمكننا إرسال سجلات حول تطبيقنا إلى تبادل موضوعات RabbiMQ باستخدام اسم المضيف الذي تم إنشاء السجل فيه وخطورة هذا السجل كموضوع. سيكون نص الرسالة هو محتوى السجل وستكون مفاتيح التوجيه الخاصة بنا على النحو التالي:
نظرًا لأننا لا نريد ملء قوائم الانتظار بسجلات غير محدودة، ما يمكننا فعله هو أنه عندما نريد مراقبة النظام، يمكننا تشغيل مستهلك يقوم بإنشاء قائمة انتظار وإرفاقه بتبادل السجلات بناءً على موضوع ما، على سبيل المثال ، نود أن نرى جميع الأخطاء التي أبلغت عنها خوادمنا. سيكون مفتاح التوجيه شيئًا مثل: #.error . في مثل هذه الحالة، يتعين علينا التوصل إلى اسم قائمة الانتظار، وربطه بالتبادل، والحصول على السجلات، وإلغاء ربطه وحذف قائمة الانتظار. لحسن الحظ، يوفر AMPQ طريقة للقيام بذلك تلقائيًا إذا قمت بتوفير الخيارات الصحيحة عند الإعلان عن قائمة الانتظار وربطها. المشكلة هي أنك لا تريد أن تتذكر كل هذه الخيارات. لهذا السبب قمنا بتنفيذ نمط المستهلك المجهول .
عندما نبدأ مستهلكًا مجهولًا، فإنه سيهتم بمثل هذه التفاصيل وعلينا فقط أن نفكر في تنفيذ رد الاتصال عند وصول الرسائل. هل يطلق عليه اسم Anonymous لأنه لن يحدد اسم قائمة الانتظار، ولكنه سينتظر حتى يقوم RabbitMQ بتعيين اسم عشوائي له.
الآن، كيفية تكوين وتشغيل مثل هذا المستهلك؟
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
هناك نحدد اسم التبادل ونوعه مع رد الاتصال الذي يجب تنفيذه عند وصول الرسالة.
أصبح هذا المستهلك المجهول الآن قادرًا على الاستماع إلى المنتجين المرتبطين بنفس التبادل ومن نوع الموضوع :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
لبدء مستهلك مجهول نستخدم الأمر التالي:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
الخيار الجديد الوحيد مقارنة بالأوامر التي رأيناها من قبل هو الخيار الذي يحدد مفتاح التوجيه : -r '#.error'
.
في بعض الحالات، قد ترغب في الحصول على مجموعة من الرسائل ثم إجراء بعض المعالجة عليها جميعًا. سيسمح لك مستهلكو الدُفعات بتحديد المنطق لهذا النوع من المعالجة.
على سبيل المثال: تخيل أن لديك قائمة انتظار حيث تتلقى رسالة لإدراج بعض المعلومات في قاعدة البيانات، وتدرك أنه إذا قمت بإدراج دفعة واحدة فمن الأفضل بكثير إدراج واحدة تلو الأخرى.
حدد خدمة رد الاتصال التي تنفذ BatchConsumerInterface
وأضف تعريف المستهلك إلى التكوين الخاص بك.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
ملاحظة : إذا تم ضبط خيار keep_alive
على true
، فسيتم تجاهل idle_timeout_exit_code
وتستمر عملية المستهلك.
يمكنك تنفيذ مستهلك الدفعة الذي سيقر جميع الرسائل في إرجاع واحد أو يمكنك التحكم في الرسالة التي يجب الإقرار بها.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
كيفية تشغيل مستهلك الدفعة التالية:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
هام: لن يتوفر لدى BatchConsumers خيار -m|messages
هام: يمكن أن يتوفر لدى BatchConsumers أيضًا خيار -b|batches
إذا كنت تريد استهلاك عدد محدد فقط من الدُفعات ثم إيقاف المستهلك. قم بإعطاء عدد الدُفعات فقط إذا كنت تريد أن يتوقف المستهلك بعد استهلاك تلك الرسائل المجمعة!
يوجد أمر يقرأ البيانات من STDIN وينشرها في قائمة انتظار RabbitMQ. لاستخدامها أولاً، عليك تكوين خدمة producer
في ملف التكوين الخاص بك مثل هذا:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
سيقوم هذا المنتج بنشر رسائل تحتوي على words
تبادل مباشر. بالطبع يمكنك تعديل التكوين حسب ما تريد.
لنفترض بعد ذلك أنك تريد نشر محتويات بعض ملفات XML بحيث تتم معالجتها بواسطة مجموعة من المستهلكين. يمكنك نشرها بمجرد استخدام أمر مثل هذا:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
هذا يعني أنه يمكنك إنشاء منتجين باستخدام أوامر Unix البسيطة.
دعونا نحلل تلك الخطوط الملاحية المنتظمة:
$ find vendor/symfony/ -name " *.xml " -print0
سيجد هذا الأمر جميع ملفات .xml
داخل مجلد Symfony وسيقوم بطباعة اسم الملف. يتم بعد ذلك نقل كل من أسماء الملفات هذه إلى cat
عبر xargs
:
$ xargs -0 cat
وأخيرًا، ينتقل ناتج cat
مباشرة إلى منتجنا الذي يتم استدعاؤه على النحو التالي:
$ ./app/console rabbitmq:stdin-producer words
يستغرق الأمر وسيطة واحدة فقط وهي اسم المنتج كما قمت بتكوينه في ملف config.yml
الخاص بك.
الغرض من هذه الحزمة هو السماح لتطبيقك بإنتاج الرسائل ونشرها في بعض التبادلات التي قمت بتكوينها.
في بعض الحالات، وحتى إذا كان التكوين الخاص بك صحيحًا، فلن يتم توجيه الرسائل التي تنتجها إلى أي قائمة انتظار لعدم وجودها. يجب تشغيل المستهلك المسؤول عن استهلاك قائمة الانتظار حتى يتم إنشاء قائمة الانتظار.
قد يكون إطلاق أمر لكل مستهلك بمثابة كابوس عندما يكون عدد المستهلكين مرتفعًا.
من أجل إنشاء تبادلات وقوائم انتظار وارتباطات مرة واحدة والتأكد من أنك لن تفقد أي رسالة، يمكنك تشغيل الأمر التالي:
$ ./app/console rabbitmq:setup-fabric
عند الرغبة، يمكنك تكوين المستهلكين والمنتجين لديك ليفترضوا أن نسيج RabbitMQ محدد بالفعل. للقيام بذلك، قم بإضافة ما يلي إلى التكوين الخاص بك:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
افتراضيًا، سيعلن المستهلك أو المنتج عن كل ما يحتاجه باستخدام RabbitMQ عند بدء تشغيله. كن حذرًا عند استخدام هذا، عندما لا يتم تحديد التبادلات أو قوائم الانتظار، ستكون هناك أخطاء. عندما تقوم بتغيير أي تكوين، فإنك تحتاج إلى تشغيل أمر setup-fabric أعلاه لإعلان التكوين الخاص بك.
للمساهمة، ما عليك سوى فتح طلب سحب باستخدام الكود الجديد الخاص بك مع الأخذ في الاعتبار أنه إذا قمت بإضافة ميزات جديدة أو تعديل الميزات الموجودة، فيجب عليك توثيق ما يفعلونه في هذا الملف التمهيدي. إذا قمت بكسر BC، فيجب عليك توثيق ذلك أيضًا. كما يجب عليك تحديث سجل التغيير. لذا:
راجع: Resources/meta/LICENSE.md
تعتمد بنية الحزمة والوثائق جزئيًا على ملف RedisBundle