RabbitMqBundle
включает обмен сообщениями в ваше приложение через RabbitMQ с использованием библиотеки php-amqplib.
Пакет реализует несколько шаблонов обмена сообщениями, как показано в библиотеке Thumper. Поэтому публиковать сообщения в RabbitMQ из контроллера Symfony так же просто, как:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Позже, когда вы захотите получить 50 сообщений из очереди upload_pictures
, вы просто запустите CLI:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Во всех примерах предполагается работающий сервер RabbitMQ.
Этот пакет был представлен на конференции Symfony Live Paris 2011. Посмотрите слайды здесь.
Из-за критических изменений, вызванных Symfony >=4.4, был выпущен новый тег, делающий пакет совместимым с Symfony >=4.4.
Требуйте пакет и его зависимости с помощью композитора:
$ composer require php-amqplib/rabbitmq-bundle
Зарегистрируйте пакет:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Наслаждаться !
Если у вас есть консольное приложение, используемое для запуска потребителей RabbitMQ, вам не нужны Symfony HttpKernel и FrameworkBundle. Начиная с версии 1.6, вы можете использовать компонент внедрения зависимостей для загрузки конфигурации и служб этого пакета, а затем использовать команду потребителя.
Требуйте пакет в вашем файле композитора.json:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Зарегистрируйте расширение и проход компилятора:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
С 4 июня 2012 г. некоторые параметры по умолчанию для бирж, объявленных в разделе конфигурации «производители», изменились, чтобы соответствовать настройкам по умолчанию для бирж, объявленных в разделе «потребители». Затронутые настройки:
durable
был изменен с false
на true
,auto_delete
было изменено с true
на false
.Ваша конфигурация должна быть обновлена, если вы полагались на предыдущие значения по умолчанию.
С 24 апреля 2012 г. сигнатура метода ConsumerInterface::execute изменилась.
С 3 января 2012 г. метод выполнения потребителей получает весь объект сообщения AMQP, а не только тело. Более подробную информацию смотрите в файле CHANGELOG.
Добавьте раздел old_sound_rabbit_mq
в файл конфигурации:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Здесь мы настраиваем службу подключения и конечные точки сообщений, которые будут иметь наше приложение. В этом примере ваш сервисный контейнер будет содержать сервис old_sound_rabbit_mq.upload_picture_producer
и old_sound_rabbit_mq.upload_picture_consumer
. Последний ожидает, что существует служба upload_picture_service
.
Если вы не укажете соединение для клиента, клиент будет искать соединение с тем же псевдонимом. Итак, для нашего upload_picture
сервис-контейнер будет искать соединение upload_picture
.
Если вам нужно добавить дополнительные аргументы очереди, параметры очереди могут быть примерно такими:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
другой пример с TTL сообщения 20 секунд:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
Значение аргумента должно быть списком типов данных и значений. Допустимые типы данных:
S
— строкаI
- целое числоD
– десятичное числоT
— временные меткиF
– СтолA
— Массивt
- Бул Адаптируйте arguments
в соответствии с вашими потребностями.
Если вы хотите связать очередь с определенными ключами маршрутизации, вы можете объявить это в конфигурации производителя или потребителя:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
В среде Symfony все сервисы полностью загружаются для каждого запроса, начиная с версии >= 4.3, вы можете объявить сервис как ленивый (Lazy Services). Этот пакет по-прежнему не поддерживает новую функцию Lazy Services, но вы можете установить lazy: true
в конфигурации подключения, чтобы избежать ненужных подключений к вашему брокеру сообщений при каждом запросе. Настоятельно рекомендуется использовать отложенные соединения из соображений производительности, тем не менее, отложенная опция отключена по умолчанию, чтобы избежать возможных сбоев в работе приложений, уже использующих этот пакет.
Рекомендуется установить для read_write_timeout
значение, в два раза превышающее тактовый сигнал, чтобы ваш сокет был открыт. Если вы этого не сделаете или не используете другой множитель, существует риск истечения времени ожидания потребительского сокета.
Имейте в виду, что вы можете ожидать проблем, если ваши задачи обычно выполняются дольше, чем период контрольного сигнала, и для которых нет хороших решений (ссылка). Рассмотрите возможность использования либо большого значения для тактового сигнала, либо оставьте тактовый сигнал отключенным в пользу keepalive
TCP (как на стороне клиента, так и на стороне сервера) и функции graceful_max_execution_timeout
.
Вы можете предоставить несколько хостов для подключения. Это позволит вам использовать кластер RabbitMQ с несколькими узлами.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Обратите внимание, что вы не можете указать
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
параметры каждому хосту отдельно.
Иногда информация о вашем соединении может быть динамической. Параметры динамического соединения позволяют задавать или переопределять параметры программно через службу.
например, в сценарии, когда параметр vhost
соединения зависит от текущего арендатора вашего приложения с белой меткой, и вы не хотите (или не можете) менять его конфигурацию каждый раз.
Определите службу в connection_parameters_provider
, которая реализует ConnectionParametersProviderInterface
, и добавьте ее в соответствующую конфигурацию connections
.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Пример реализации:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
В этом случае параметр vhost
будет переопределен выходными данными getVhost()
.
В приложении обмена сообщениями процесс, отправляющий сообщения брокеру, называется производителем , а процесс, получающий эти сообщения, называется потребителем . В вашем приложении у вас будет несколько из них, которые вы можете перечислить в соответствующих записях конфигурации.
Производитель будет использоваться для отправки сообщений на сервер. В модели AMQP сообщения отправляются на биржу , это означает, что в конфигурации производителя вам нужно будет указать параметры подключения вместе с опциями обмена, которыми обычно являются имя биржи и ее тип.
Теперь предположим, что вы хотите обрабатывать загрузку изображений в фоновом режиме. После того, как вы переместите изображение в его конечное место, вы опубликуете на сервере сообщение со следующей информацией:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Как видите, если в вашей конфигурации у вас есть производитель с именем upload_picture , то в сервис-контейнере у вас будет сервис с именем old_sound_rabbit_mq.upload_picture_producer .
Помимо самого сообщения, метод OldSoundRabbitMqBundleRabbitMqProducer#publish()
также принимает необязательный параметр ключа маршрутизации и необязательный массив дополнительных свойств. Массив дополнительных свойств позволяет вам изменять свойства, с которыми объект PhpAmqpLibMessageAMQPMessage
создается по умолчанию. Таким образом, например, вы можете изменить заголовки приложений.
Вы можете использовать методы setContentType и setDeliveryMode , чтобы установить тип контента сообщения и режим доставки сообщения соответственно, переопределяя любой набор по умолчанию в разделе конфигурации «производители». Если это не переопределено конфигурацией «производителей» или явным вызовом этих методов (как в примере ниже), значения по умолчанию — text/plain для типа контента и 2 для режима доставки.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Если вам нужно использовать собственный класс для производителя (который должен наследовать от OldSoundRabbitMqBundleRabbitMqProducer
), вы можете использовать опцию class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
Следующая часть головоломки — иметь потребителя, который будет извлекать сообщение из очереди и соответствующим образом его обрабатывать.
В настоящее время продюсер генерирует два события.
Это событие происходит непосредственно перед публикацией сообщения. Это хороший способ выполнить окончательную регистрацию, проверку и т. д. перед фактической отправкой сообщения. Пример реализации прослушивателя:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Это событие происходит сразу после публикации сообщения. Это хороший способ выполнить регистрацию подтверждений, коммитов и т. д. после фактической отправки сообщения. Пример реализации прослушивателя:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Потребитель подключится к серверу и запустит цикл ожидания обработки входящих сообщений. В зависимости от указанного обратного вызова для такого потребителя будет зависеть его поведение. Давайте рассмотрим потребительскую конфигурацию сверху:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Как мы видим, опция обратного вызова имеет ссылку на upload_picture_service . Когда потребитель получит сообщение от сервера, он выполнит такой обратный вызов. Если для целей тестирования или отладки вам необходимо указать другой обратный вызов, вы можете изменить его там.
Помимо обратного вызова мы также указываем используемое соединение, так же, как мы делаем это с производителем . Остальные параметры — это обмен_опции и очередь_опции . Exchange_options должны быть такими же, как те, которые используются для производителя . В параметреqueue_options мы укажем имя очереди . Почему?
Как мы уже говорили, сообщения в AMQP публикуются на бирже . Это не означает, что сообщение попало в очередь . Чтобы это произошло, сначала нам нужно создать такую очередь , а затем привязать ее к бирже . Самое интересное в этом то, что вы можете привязать несколько очередей к одному обмену , таким образом одно сообщение может прийти в несколько пунктов назначения. Преимущество этого подхода заключается в отделении от производителя и потребителя. Продюсера не волнует, сколько потребителей обработают его сообщения. Все, что ему нужно, это чтобы его сообщение дошло до сервера. Таким образом, мы можем расширить действия, которые мы выполняем каждый раз при загрузке изображения, без необходимости изменять код в нашем контроллере.
Теперь, как запустить потребителя? Для этого есть команда, которую можно выполнить следующим образом:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Что это значит? Мы запускаем потребителя upload_picture, сообщая ему, что он должен обрабатывать только 50 сообщений. Каждый раз, когда потребитель получает сообщение от сервера, он выполняет настроенный обратный вызов, передавая сообщение AMQP как экземпляр класса PhpAmqpLibMessageAMQPMessage
. Тело сообщения можно получить, вызвав $msg->body
. По умолчанию потребитель будет обрабатывать сообщения в бесконечном цикле для некоторого определения бесконечного .
Если вы хотите быть уверены, что потребитель мгновенно завершит выполнение по сигналу Unix, вы можете запустить команду с флагом -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
Тогда потребитель мгновенно завершит выполнение.
Для использования команды с этим флагом вам необходимо установить PHP с расширением PCNTL.
Если вы хотите установить ограничение потребительской памяти, вы можете сделать это, используя флаг -l
. В следующем примере этот флаг добавляет ограничение памяти в 256 МБ. Потребитель будет остановлен за пять МБ до достижения 256 МБ, чтобы избежать ошибки размера разрешенной памяти PHP.
$ ./app/console rabbitmq:consumer -l 256
Если вы хотите удалить все сообщения, ожидающие в очереди, вы можете выполнить эту команду, чтобы очистить эту очередь:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Для удаления очереди потребителя используйте следующую команду:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Это может быть полезно во многих сценариях. Существует 3 события AMQPE:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` и проверять развертывание нового приложения.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Событие возникает перед обработкой AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Событие, возникающее после обработки AMQPMessage
. Если сообщение процесса выдаст исключение, событие не возникнет.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Событие возникает, когда метод wait
завершается по таймауту без получения сообщения. Чтобы использовать это событие, необходимо настроить idle_timeout
. По умолчанию процесс завершается по тайм-ауту простоя, вы можете предотвратить это, установив $event->setForceStop(false)
в прослушивателе.
Если вам нужно установить тайм-аут, когда в течение определенного периода времени в вашей очереди нет сообщений, вы можете установить idle_timeout
в секундах. idle_timeout_exit_code
указывает, какой код выхода должен быть возвращен потребителем при наступлении тайм-аута простоя. Без его указания потребитель выдаст исключение PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Установите timeout_wait
в секундах. timeout_wait
указывает, как долго потребитель будет ждать без получения нового сообщения, прежде чем убедиться, что текущее соединение все еще действительно.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Если вы хотите, чтобы ваш потребитель работал до определенного времени, а затем корректно завершал работу, установите graceful_max_execution.timeout
в секундах. «Мягкий выход» означает, что потребитель выйдет либо после текущей задачи, либо сразу же, при ожидании новых задач. graceful_max_execution.exit_code
указывает, какой код выхода должен быть возвращен потребителем, когда наступает тайм-аут постепенного максимального выполнения. Без указания этого потребитель выйдет со статусом 0
.
Эта функция отлично подходит в сочетании с супервизором, который вместе может обеспечить периодическую очистку утечек памяти, соединение с обновлением базы данных/rabbitmq и многое другое.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Вы могли заметить, что диспетчеризация по-прежнему работает не так, как нам хотелось бы. Например, в ситуации с двумя работниками, когда все нечетные сообщения тяжелые, а четные - легкие, один работник будет постоянно занят, а другой практически не будет выполнять никакой работы. Ну, RabbitMQ ничего об этом не знает и все равно будет отправлять сообщения равномерно.
Это происходит потому, что RabbitMQ просто отправляет сообщение, когда оно попадает в очередь. Он не учитывает количество неподтвержденных сообщений для потребителя. Он просто слепо отправляет каждое n-е сообщение n-му потребителю.
Чтобы обойти это, мы можем использовать метод Basic.qos с настройкой prefetch_count=1. Это говорит RabbitMQ не отправлять более одного сообщения работнику одновременно. Или, другими словами, не отправляйте новое сообщение работнику, пока он не обработает и не подтвердит предыдущее. Вместо этого он отправит его следующему работнику, который еще не занят.
Откуда: http://www.rabbitmq.com/tutorials/tutorial-two-python.html.
Будьте осторожны, так как реализация справедливой диспетчеризации приводит к задержке, которая снижает производительность (см. этот пост в блоге). Но его реализация позволит вам динамически масштабироваться по горизонтали по мере увеличения очереди. Как рекомендует сообщение в блоге, вам следует оценить правильное значение prefetch_size в соответствии со временем, затраченным на обработку каждого сообщения, и производительностью вашей сети.
С помощью RabbitMqBundle вы можете настроить qos_options для каждого потребителя следующим образом:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
При использовании с пакетом Symfony 4.2+ в контейнере объявляется набор псевдонимов для производителей и обычных потребителей. Они используются для автоматического связывания аргументов на основе объявленного типа и имени аргумента. Это позволяет вам изменить предыдущий пример производителя на:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
Имя аргумента состоит из имени производителя или потребителя из конфигурации и сопровождается суффиксом слова производителя или потребителя в зависимости от типа. В отличие от соглашения об именовании элементов контейнера, суффикс слова (производитель или потребитель) не будет дублироваться, если имя уже имеет суффикс. Ключ производителя upload_picture
будет изменен на имя аргумента $uploadPictureProducer
. Ключ производителя upload_picture_producer
также будет связан с именем аргумента $uploadPictureProducer
. Лучше всего избегать подобных имён.
Все производители имеют псевдонимы OldSoundRabbitMqBundleRabbitMqProducerInterface
и параметр класса производителя из конфигурации. В режиме песочницы создаются только псевдонимы ProducerInterface
. Настоятельно рекомендуется использовать класс ProducerInterface
при вводе аргументов подсказки для внедрения производителя.
Все потребители имеют псевдонимы OldSoundRabbitMqBundleRabbitMqConsumerInterface
и значения параметра конфигурации %old_sound_rabbit_mq.consumer.class%
. Нет никакой разницы между обычным режимом и режимом песочницы. Настоятельно рекомендуется использовать ConsumerInterface
при вводе аргументов подсказки для внедрения клиента.
Вот пример обратного вызова:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Как видите, это так же просто, как реализовать один метод: ConsumerInterface::execute .
Имейте в виду, что ваши обратные вызовы должны быть зарегистрированы как обычные сервисы Symfony. Туда вы можете внедрить сервисный контейнер, сервис базы данных, логгер Symfony и так далее.
См. https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md для получения более подробной информации о том, что является частью экземпляра сообщения.
Чтобы остановить потребителя, обратный вызов может вызвать StopConsumerException
(последнее использованное сообщение не будет подтверждено) или AckStopConsumerException
(сообщение будет подтверждено). При использовании демонизированного, например, супервизора, потребитель фактически перезапустится.
Кажется, что для простой отправки сообщений требуется довольно много работы, давайте подведем итоги, чтобы иметь лучшее представление. Вот что нам нужно для создания/потребления сообщений:
И все!
Это было требованием для отслеживания полученных/опубликованных сообщений. Чтобы включить это, вам необходимо добавить конфигурацию enable_logger
для потребителей или издателей.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Если вы хотите, вы также можете обрабатывать журналирование из очередей с разными обработчиками в монологе, ссылаясь на канал phpamqplib
.
До сих пор мы просто отправляли сообщения потребителям, но что, если мы хотим получить от них ответ? Для этого нам необходимо реализовать вызовы RPC в нашем приложении. Этот пакет позволяет довольно легко добиться таких результатов с помощью Symfony.
Добавим в конфигурацию клиент и сервер RPC:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Для получения полной информации о конфигурации используйте команду php app/console config:dump-reference old_sound_rabbit_mq
.
Здесь у нас есть очень полезный сервер: он возвращает своим клиентам случайные целые числа. Обратным вызовом, используемым для обработки запроса, будет служба random_int_server . Теперь давайте посмотрим, как вызвать его из наших контроллеров.
Сначала нам нужно запустить сервер из командной строки:
$ ./app/console_dev rabbitmq:rpc-server random_int
А затем добавьте в наш контроллер следующий код:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Как вы можете видеть, если наш идентификатор клиента — целое_магазин , то имя сервиса будет old_sound_rabbit_mq.integer_store_rpc . Как только мы получим этот объект, мы размещаем запрос на сервере, вызывая addRequest
, который ожидает три параметра:
Аргументы, которые мы отправляем, — это минимальное и максимальное значения функции rand()
. Мы отправляем их путем сериализации массива. Если наш сервер ожидает информацию в формате JSON или XML, мы отправим такие данные сюда.
Последняя часть — получить ответ. Наш PHP-скрипт будет блокироваться до тех пор, пока сервер не вернет значение. Переменная $replies будет представлять собой ассоциативный массив, в котором каждый ответ от сервера будет содержаться в соответствующем ключе request_id .
По умолчанию клиент RPC ожидает сериализации ответа. Если сервер, с которым вы работаете, возвращает несериализованный результат, установите для параметра expect_serialized_response
клиента RPC значение false. Например, если сервер Integer_store не сериализовал результат, клиент будет настроен следующим образом:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
Вы также можете установить срок действия запроса в миллисекундах, после которого сообщение больше не будет обрабатываться сервером, а запрос клиента просто истечет по тайм-ауту. Установка срока действия сообщений работает только для RabbitMQ 3.x и выше. Посетите http://www.rabbitmq.com/ttl.html#per-message-ttl для получения дополнительной информации.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Как вы можете догадаться, мы также можем совершать параллельные вызовы RPC .
Допустим, для рендеринга некоторой веб-страницы вам необходимо выполнить два запроса к базе данных, один из которых занимает 5 секунд, а другой — 2 секунды (очень дорогостоящие запросы). Если вы выполните их последовательно, ваша страница будет готова к отправке примерно через 7 секунд. Если вы запустите их параллельно, ваша страница будет обслужена примерно через 5 секунд. С RabbitMqBundle
мы можем легко выполнять такие параллельные вызовы. Определим в конфиге параллельный клиент и еще один RPC-сервер:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Затем этот код должен войти в наш контроллер:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
Очень похоже на предыдущий пример, только у нас есть дополнительный вызов addRequest
. Также мы предоставляем значимые идентификаторы запросов, чтобы позже нам было легче найти нужный ответ в массиве $replies .
Чтобы включить прямой ответ клиентам, вам просто нужно включить опцию Direct_reply_to в конфигурации rpc_clients для клиента.
Эта опция будет использовать псевдоочередь amq.rabbitmq.reply-to при выполнении вызовов RPC. На сервере RPC никаких изменений не требуется.
RabbitMQ имеет реализацию очереди приоритетов в ядре, начиная с версии 3.5.0. Любую очередь можно превратить в приоритетную, используя дополнительные аргументы, предоставленные клиентом (но, в отличие от других функций, которые используют необязательные аргументы, а не политики). Реализация поддерживает ограниченное количество приоритетов: 255. Рекомендуются значения от 1 до 10. Проверьте документацию
вот как вы можете объявить приоритетную очередь
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
если очередь upload-picture
существует до того, как вы должны удалить эту очередь перед запуском команды rabbitmq:setup-fabric
Теперь предположим, что вы хотите создать сообщение с высоким приоритетом, вам необходимо опубликовать сообщение с этой дополнительной информацией.
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
Хорошей практикой является наличие большого количества очередей для логического разделения. При использовании простого потребителя вам придется создать по одному работнику (потребителю) для каждой очереди, и с этим может быть сложно справиться при работе со многими эволюциями (забудете добавить строку в конфигурацию супервизора?). Это также полезно для небольших очередей, поскольку вы, возможно, не захотите иметь столько рабочих процессов, сколько очередей, и захотите перегруппировать некоторые задачи вместе, не теряя при этом гибкости и принципа разделения.
Несколько потребителей позволяют вам обрабатывать этот вариант использования, прослушивая несколько очередей одного и того же потребителя.
Вот как вы можете настроить потребителя с несколькими очередями:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
Обратный вызов теперь указан для каждой очереди и должен реализовывать ConsumerInterface
как простой потребитель. Все параметры queues-options
в потребителе доступны для каждой очереди.
Имейте в виду, что все очереди находятся в одном обмене, поэтому вам нужно установить правильную маршрутизацию для обратных вызовов.
queues_provider
— это дополнительная служба, которая динамически предоставляет очереди. Он должен реализовать QueuesProviderInterface
.
Имейте в виду, что поставщики очередей несут ответственность за правильные вызовы setDequeuer
и что обратные вызовы являются вызываемыми объектами (а не ConsumerInterface
). В случае, если очереди предоставления услуг реализуют DequeuerAwareInterface
, вызов setDequeuer
добавляется к определению службы, при этом DequeuerInterface
в настоящее время является MultipleConsumer
.
Вы можете обнаружить, что ваше приложение имеет сложный рабочий процесс и вам необходима произвольная привязка. Сценарии произвольной привязки могут включать привязки обмена между обменами через свойство destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
Команда Rabbitmq:setup-fabric объявит обмены и очереди, как определено в ваших конфигурациях производителя, потребителя и нескольких потребителей, прежде чем она создаст ваши произвольные привязки. Однако Rabbitmq:setup-fabric НЕ будет объявлять очереди добавления и обмены, определенные в привязках. Вы должны убедиться, что обмены/очереди объявлены.
Иногда приходится менять конфигурацию потребителя на лету. Динамические потребители позволяют вам программно определять параметры очереди потребителей в зависимости от контекста.
например, в сценарии, когда определенный потребитель должен отвечать за динамическое количество тем, и вы не хотите (или не можете) каждый раз менять его конфигурацию.
Определите службу queue_options_provider
, реализующую QueueOptionsProviderInterface
, и добавьте ее в конфигурацию dynamic_consumers
.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Пример использования:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
В этом случае потребитель proc_logs
работает для server1
и может выбирать, какие параметры очереди использовать.
Итак, зачем нам когда-либо понадобятся анонимные потребители? Это похоже на какую-то интернет-угрозу или что-то в этом роде… Продолжайте читать.
В AMQP есть тип обмена, называемый темой , где сообщения направляются в очереди в зависимости, как вы догадываетесь, от темы сообщения. Мы можем отправлять журналы о нашем приложении на обмен тем RabbiMQ, используя в качестве темы имя хоста, на котором был создан журнал, и серьезность такого журнала. Тело сообщения будет содержанием журнала, а наши ключи маршрутизации будут такими:
Поскольку мы не хотим заполнять очереди неограниченным количеством журналов, мы можем сделать следующее: когда мы хотим контролировать систему, мы можем запустить потребителя, который создает очередь и подключается к обмену журналами на основе какой-то темы, например , мы хотели бы видеть все ошибки, о которых сообщают наши серверы. Ключ маршрутизации будет выглядеть примерно так: #.error . В таком случае нам нужно придумать имя очереди, привязать ее к бирже, получить логи, отвязать и удалить очередь. К счастью, AMPQ предоставляет возможность сделать это автоматически, если вы предоставите правильные параметры при объявлении и привязке очереди. Проблема в том, что вы не хотите запоминать все эти варианты. По этой причине мы внедрили шаблон анонимного потребителя .
Когда мы запускаем Anonymous Consumer, он позаботится о таких деталях, и нам просто нужно подумать о реализации обратного вызова при поступлении сообщений. Он называется анонимным, потому что он не будет указывать имя очереди, а будет ждать, пока RabbitMQ присвоит ей случайное имя.
Теперь, как настроить и запустить такого потребителя?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Там мы указываем имя обмена и его тип, а также обратный вызов, который должен выполняться при поступлении сообщения.
Этот анонимный потребитель теперь может слушать производителей, которые связаны с той же биржей и относятся к теме типа:
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Чтобы запустить анонимного потребителя, мы используем следующую команду:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
Единственная новая опция по сравнению с командами, которые мы видели ранее, — это та, которая указывает ключ маршрутизации : -r '#.error'
.
В некоторых случаях вам может потребоваться получить пакет сообщений, а затем выполнить некоторую обработку их всех. Пакетные потребители позволят вам определить логику для этого типа обработки.
например: Представьте, что у вас есть очередь, в которой вы получаете сообщение о вставке некоторой информации в базу данных, и вы понимаете, что если вы выполняете пакетную вставку, это намного лучше, чем вставка по одному.
Определите службу обратного вызова, реализующую BatchConsumerInterface
, и добавьте определение потребителя в свою конфигурацию.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Примечание . Если для параметра keep_alive
установлено значение true
, idle_timeout_exit_code
будет игнорироваться, и потребительский процесс продолжится.
Вы можете реализовать пакетный потребитель, который будет подтверждать все сообщения за один возврат, или вы можете контролировать, какое сообщение подтверждать.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
Как запустить следующий пакетный потребитель:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Важно! Для BatchConsumers не будет доступна опция -m|messages
Важно. Для BatchConsumers также может быть доступна опция -b|batches
, если вы хотите использовать только определенное количество пакетов, а затем остановить потребителя. Указывайте количество пакетов только в том случае, если вы хотите, чтобы потребитель остановился после того, как эти пакетные сообщения были обработаны!
Есть команда, которая считывает данные из STDIN и публикует их в очередь RabbitMQ. Чтобы использовать его, сначала вам необходимо настроить службу producer
в файле конфигурации следующим образом:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Этот производитель будет публиковать сообщения со words
прямого обмена. Конечно, вы можете адаптировать конфигурацию по своему усмотрению.
Предположим, вы хотите опубликовать содержимое некоторых XML-файлов, чтобы их обрабатывала группа потребителей. Вы можете опубликовать их, просто используя такую команду:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Это означает, что вы можете создавать производителей с помощью простых команд Unix.
Давайте разложим этот один лайнер:
$ find vendor/symfony/ -name " *.xml " -print0
Эта команда найдет все файлы .xml
внутри папки symfony и распечатает имя файла. Каждое из этих имен файлов затем передается в cat
через xargs
:
$ xargs -0 cat
И, наконец, вывод cat
поступает непосредственно нашему производителю, который вызывается следующим образом:
$ ./app/console rabbitmq:stdin-producer words
Он принимает только один аргумент — имя производителя, настроенное вами в файле config.yml
.
Цель этого пакета — позволить вашему приложению создавать сообщения и публиковать их на некоторых настроенных вами обменах.
В некоторых случаях, даже если ваша конфигурация правильна, создаваемые вами сообщения не будут направляться ни в одну очередь, поскольку ее не существует. Для создания очереди необходимо запустить потребителя, ответственного за потребление очереди.
Запуск команды для каждого потребителя может оказаться кошмаром, если число потребителей велико.
Чтобы одновременно создать обмены, очереди и привязки и быть уверенным, что вы не потеряете ни одного сообщения, вы можете запустить следующую команду:
$ ./app/console rabbitmq:setup-fabric
При желании вы можете настроить своих потребителей и производителей так, чтобы они предполагали, что структура RabbitMQ уже определена. Для этого добавьте в свою конфигурацию следующее:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
По умолчанию потребитель или производитель объявляет все, что ему нужно, с помощью RabbitMQ при его запуске. Будьте осторожны, используя это, если обмены или очереди не определены, возникнут ошибки. Когда вы изменили какую-либо конфигурацию, вам нужно запустить приведенную выше команду setup-fabric, чтобы объявить вашу конфигурацию.
Чтобы внести свой вклад, просто откройте запрос на включение с вашим новым кодом, принимая во внимание, что если вы добавляете новые функции или изменяете существующие, вам необходимо задокументировать в этом README, что они делают. Если вы нарушите BC, вам также придется это задокументировать. Также вам необходимо обновить CHANGELOG. Так:
См.: resources/meta/LICENSE.md.
Структура пакета и документация частично основаны на RedisBundle.