RabbitMqBundle
incorpora mensajería en su aplicación a través de RabbitMQ utilizando la biblioteca php-amqplib.
El paquete implementa varios patrones de mensajería como se ve en la biblioteca Thumper. Por lo tanto, publicar mensajes en RabbitMQ desde un controlador Symfony es tan fácil como:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Más adelante, cuando desee consumir 50 mensajes de la cola upload_pictures
, simplemente ejecute en la CLI:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Todos los ejemplos esperan un servidor RabbitMQ en ejecución.
Este paquete se presentó en la conferencia Symfony Live Paris 2011. Vea las diapositivas aquí.
Debido a los cambios importantes ocurridos causados por Symfony >=4.4, se lanzó una nueva etiqueta que hace que el paquete sea compatible con Symfony >=4.4.
Requerir el paquete y sus dependencias con el compositor:
$ composer require php-amqplib/rabbitmq-bundle
Registre el paquete:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Disfrutar !
Si tiene una aplicación de consola utilizada para ejecutar consumidores RabbitMQ, no necesita Symfony HttpKernel ni FrameworkBundle. A partir de la versión 1.6, puede usar el componente Inyección de dependencia para cargar la configuración y los servicios de este paquete y luego usar el comando del consumidor.
Requiere el paquete en tu archivo compositor.json:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Registre la extensión y el pase del compilador:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
Desde el 4 de junio de 2012, algunas opciones predeterminadas para los intercambios declarados en la sección de configuración "productores" han cambiado para coincidir con los valores predeterminados de los intercambios declarados en la sección "consumidores". Las configuraciones afectadas son:
durable
se cambió de false
a true
,auto_delete
se cambió de true
a false
.Su configuración debe actualizarse si confiaba en los valores predeterminados anteriores.
Desde el 24 de abril de 2012, la firma del método ConsumerInterface::execute ha cambiado
Desde el 3 de enero de 2012, el método de ejecución de los consumidores obtiene todo el objeto del mensaje AMQP y no solo el cuerpo. Consulte el archivo CHANGELOG para obtener más detalles.
Agregue la sección old_sound_rabbit_mq
en su archivo de configuración:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Aquí configuramos el servicio de conexión y los endpoints de mensajes que tendrá nuestra aplicación. En este ejemplo, su contenedor de servicios contendrá el servicio old_sound_rabbit_mq.upload_picture_producer
y old_sound_rabbit_mq.upload_picture_consumer
. Este último espera que haya un servicio llamado upload_picture_service
.
Si no especifica una conexión para el cliente, el cliente buscará una conexión con el mismo alias. Entonces, para nuestro upload_picture
el contenedor de servicios buscará una conexión upload_picture
.
Si necesita agregar argumentos de cola opcionales, sus opciones de cola pueden ser algo como esto:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
otro ejemplo con mensaje TTL de 20 segundos:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
El valor del argumento debe ser una lista de tipo de datos y valor. Los tipos de datos válidos son:
S
- CuerdaI
- EnteroD
-DecimalT
- Marcas de tiempoF
- TablaA
- matrizt
-booleano Adapta los arguments
según tus necesidades.
Si desea vincular la cola con claves de enrutamiento específicas, puede declararla en la configuración del productor o del consumidor:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
En un entorno Symfony, todos los servicios se inician completamente para cada solicitud; a partir de la versión >= 4.3 puedes declarar un servicio como diferido (Lazy Services). Este paquete aún no admite la nueva función Lazy Services, pero puede configurar lazy: true
en la configuración de su conexión para evitar conexiones innecesarias a su agente de mensajes en cada solicitud. Se recomienda encarecidamente utilizar conexiones diferidas por motivos de rendimiento; sin embargo, la opción diferida está deshabilitada de forma predeterminada para evitar posibles interrupciones en las aplicaciones que ya utilizan este paquete.
Es una buena idea configurar read_write_timeout
al doble del latido para que su socket esté abierto. Si no hace esto, o usa un multiplicador diferente, existe el riesgo de que se agote el tiempo de espera del socket del consumidor .
Tenga en cuenta que puede esperar problemas si sus tareas generalmente duran más que el período de latido, para lo cual no hay buenas soluciones (enlace). Considere usar un valor grande para el latido o dejar el latido deshabilitado en favor de la keepalive
de tcp (tanto en el lado del cliente como del servidor) y la función graceful_max_execution_timeout
.
Puede proporcionar varios hosts para una conexión. Esto le permitirá utilizar el clúster RabbitMQ con varios nodos.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Presta atención que no puedes especificar.
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
parámetros a cada host por separado.
A veces, es posible que la información de su conexión deba ser dinámica. Los parámetros de conexión dinámica le permiten proporcionar o anular parámetros mediante programación a través de un servicio.
por ejemplo, en un escenario en el que el parámetro vhost
de la conexión depende del inquilino actual de su aplicación de etiqueta blanca y no desea (o no puede) cambiar su configuración cada vez.
Defina un servicio en connection_parameters_provider
que implemente ConnectionParametersProviderInterface
y agréguelo a la configuración connections
adecuada.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Implementación de ejemplo:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
En este caso, el parámetro vhost
será anulado por la salida de getVhost()
.
En una aplicación de mensajería, el proceso que envía mensajes al corredor se llama productor, mientras que el proceso que recibe esos mensajes se llama consumidor . En tu aplicación tendrás varios de ellos que podrás enumerar en sus respectivas entradas en la configuración.
Se utilizará un productor para enviar mensajes al servidor. En el Modelo AMQP los mensajes se envían a un exchange , esto quiere decir que en la configuración para un productor tendrás que especificar las opciones de conexión junto con las opciones de exchange, que normalmente será el nombre del exchange y el tipo del mismo.
Ahora digamos que desea procesar la carga de imágenes en segundo plano. Después de mover la imagen a su ubicación final, publicará un mensaje en el servidor con la siguiente información:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Como puedes ver, si en tu configuración tienes un productor llamado upload_picture , entonces en el contenedor de servicios tendrás un servicio llamado old_sound_rabbit_mq.upload_picture_producer .
Además del mensaje en sí, el método OldSoundRabbitMqBundleRabbitMqProducer#publish()
también acepta un parámetro de clave de enrutamiento opcional y una matriz opcional de propiedades adicionales. La matriz de propiedades adicionales le permite modificar las propiedades con las que se construye un objeto PhpAmqpLibMessageAMQPMessage
de forma predeterminada. De esta forma, por ejemplo, puedes cambiar los encabezados de la aplicación.
Puede utilizar los métodos setContentType y setDeliveryMode para configurar el tipo de contenido del mensaje y el modo de entrega del mensaje respectivamente, anulando cualquier conjunto predeterminado en la sección de configuración "productores". Si no se anula mediante la configuración de "productores" o una llamada explícita a estos métodos (como en el ejemplo siguiente), los valores predeterminados son texto/sin formato para el tipo de contenido y 2 para el modo de entrega.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Si necesita usar una clase personalizada para un productor (que debería heredar de OldSoundRabbitMqBundleRabbitMqProducer
), puede usar la opción class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
La siguiente pieza del rompecabezas es tener un consumidor que saque el mensaje de la cola y lo procese en consecuencia.
Actualmente hay dos eventos emitidos por la productora.
Este evento ocurre inmediatamente antes de publicar el mensaje. Este es un buen enlace para realizar cualquier registro final, validación, etc. antes de enviar el mensaje. Una implementación de muestra de un oyente:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Este evento ocurre inmediatamente después de publicar el mensaje. Este es un buen enlace para realizar registros de confirmación, confirmaciones, etc. después de enviar el mensaje. Una implementación de muestra de un oyente:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Un consumidor se conectará al servidor e iniciará un bucle esperando que se procesen los mensajes entrantes. Dependiendo de la devolución de llamada especificada para dicho consumidor será el comportamiento que tendrá. Repasemos la configuración del consumidor desde arriba:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Como vemos allí, la opción de devolución de llamada tiene una referencia a upload_picture_service . Cuando el consumidor recibe un mensaje del servidor, ejecutará dicha devolución de llamada. Si para fines de prueba o depuración necesita especificar una devolución de llamada diferente, puede cambiarla allí.
Aparte de la devolución de llamada, también especificamos la conexión a utilizar, de la misma manera que lo hacemos con un productor . Las opciones restantes son exchange_options y queue_options . Las opciones de intercambio deben ser las mismas que las utilizadas para el productor . En queue_options proporcionaremos un nombre de cola . ¿Por qué?
Como dijimos, los mensajes en AMQP se publican en un intercambio . Esto no significa que el mensaje haya llegado a una cola . Para que esto suceda, primero debemos crear dicha cola y luego vincularla al intercambio . Lo bueno de esto es que puedes vincular varias colas a un intercambio , de esa manera un mensaje puede llegar a varios destinos. La ventaja de este enfoque es la desvinculación entre el productor y el consumidor. Al productor no le importa cuántos consumidores procesarán sus mensajes. Lo único que necesita es que su mensaje llegue al servidor. De esta forma podremos ampliar las acciones que realizamos cada vez que se sube una imagen sin necesidad de cambiar el código en nuestro controlador.
Ahora bien, ¿cómo gestionar un consumidor? Hay un comando que se puede ejecutar así:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
¿Qué quiere decir esto? Estamos ejecutando el consumidor upload_picture diciéndole que consuma solo 50 mensajes. Cada vez que el consumidor recibe un mensaje del servidor, ejecutará la devolución de llamada configurada pasando el mensaje AMQP como una instancia de la clase PhpAmqpLibMessageAMQPMessage
. El cuerpo del mensaje se puede obtener llamando $msg->body
. De forma predeterminada, el consumidor procesará los mensajes en un bucle sin fin para obtener alguna definición de infinito .
Si desea estar seguro de que el consumidor terminará de ejecutarse instantáneamente en la señal de Unix, puede ejecutar el comando con la bandera -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
Entonces el consumidor terminará de ejecutarse instantáneamente.
Para usar el comando con esta bandera, necesita instalar PHP con la extensión PCNTL.
Si desea establecer un límite de memoria del consumidor, puede hacerlo usando flag -l
. En el siguiente ejemplo, esta bandera agrega un límite de memoria de 256 MB. El consumidor se detendrá cinco MB antes de alcanzar los 256 MB para evitar un error de tamaño de memoria permitido por PHP.
$ ./app/console rabbitmq:consumer -l 256
Si desea eliminar todos los mensajes en espera en una cola, puede ejecutar este comando para purgar esta cola:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Para eliminar la cola del consumidor, utilice este comando:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Esto puede resultar útil en muchos escenarios. Hay 3 eventos AMQPE:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` y comprobar si se ha implementado una nueva aplicación.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Evento generado antes de procesar un AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Evento generado después de procesar un AMQPMessage
. Si el mensaje de proceso arroja una excepción, el evento no se generará.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Evento generado cuando el método wait
sale por tiempo de espera sin recibir un mensaje. Para poder hacer uso de este evento se debe configurar un idle_timeout
del consumidor. De forma predeterminada, el proceso sale en el tiempo de espera de inactividad, puede evitarlo configurando $event->setForceStop(false)
en un oyente.
Si necesita establecer un tiempo de espera cuando no hay mensajes de su cola durante un período de tiempo, puede configurar idle_timeout
en segundos. El idle_timeout_exit_code
especifica qué código de salida debe devolver el consumidor cuando se produce el tiempo de espera de inactividad. Sin especificarlo, el consumidor generará una excepción PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Establezca el timeout_wait
en segundos. timeout_wait
especifica cuánto tiempo esperará el consumidor sin recibir un nuevo mensaje antes de asegurarse de que la conexión actual siga siendo válida.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Si desea que su consumidor esté funcionando hasta cierto tiempo y luego salga con gracia, configure graceful_max_execution.timeout
en segundos. "Salir con gracia" significa que el consumidor saldrá después de la tarea que se está ejecutando actualmente o inmediatamente, mientras espera nuevas tareas. graceful_max_execution.exit_code
especifica qué código de salida debe devolver el consumidor cuando se agota el tiempo de espera máximo de ejecución elegante. Sin especificarlo, el consumidor saldrá con el estado 0
.
Esta característica es excelente en conjunto con supervisord, que en conjunto puede permitir la limpieza periódica de pérdidas de memoria, la conexión con la renovación de la base de datos/rabbitmq y más.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Es posible que haya notado que el envío aún no funciona exactamente como queremos. Por ejemplo, en una situación con dos trabajadores, cuando todos los mensajes impares son pesados y los mensajes pares son livianos, un trabajador estará constantemente ocupado y el otro apenas realizará ningún trabajo. Bueno, RabbitMQ no sabe nada sobre eso y seguirá enviando mensajes de manera uniforme.
Esto sucede porque RabbitMQ simplemente envía un mensaje cuando el mensaje ingresa a la cola. No analiza la cantidad de mensajes no reconocidos de un consumidor. Simplemente envía ciegamente cada enésimo mensaje al enésimo consumidor.
Para solucionar esto, podemos usar el método basic.qos con la configuración prefetch_count=1. Esto le indica a RabbitMQ que no dé más de un mensaje a un trabajador a la vez. O, en otras palabras, no envíe un nuevo mensaje a un trabajador hasta que haya procesado y reconocido el anterior. En lugar de ello, lo enviará al siguiente trabajador que todavía no esté ocupado.
De: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Tenga cuidado, ya que la implementación del envío justo introduce una latencia que afectará el rendimiento (consulte esta publicación de blog). Pero implementarlo le permite escalar horizontalmente de forma dinámica a medida que aumenta la cola. Debe evaluar, como recomienda la publicación del blog, el valor correcto de prefetch_size de acuerdo con el tiempo necesario para procesar cada mensaje y el rendimiento de su red.
Con RabbitMqBundle, puedes configurar qos_options por consumidor de esta manera:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Si se usa con el paquete Symfony 4.2+ , declara en un contenedor un conjunto de alias para productores y consumidores habituales. Se utilizan para el cableado automático de argumentos según el tipo declarado y el nombre del argumento. Esto le permite cambiar el ejemplo de productor anterior a:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
El nombre del argumento se construye a partir del nombre del productor o del consumidor de la configuración y tiene el sufijo de la palabra del productor o del consumidor según el tipo. A diferencia de los elementos del contenedor, el sufijo de palabra de convención de nomenclatura (productor o consumidor) no se duplicará si el nombre ya tiene el sufijo. La clave del productor upload_picture
se cambiará al nombre del argumento $uploadPictureProducer
. La clave de productor upload_picture_producer
también tendría un alias con el nombre del argumento $uploadPictureProducer
. Es mejor evitar nombres similares en ese sentido.
Todos los productores tienen un alias de OldSoundRabbitMqBundleRabbitMqProducerInterface
y la opción de clase de productor desde la configuración. En el modo sandbox solo se crean alias ProducerInterface
. Se recomienda encarecidamente utilizar la clase ProducerInterface
al escribir argumentos de sugerencias para la inyección del productor.
Todos los consumidores tienen un alias de OldSoundRabbitMqBundleRabbitMqConsumerInterface
y el valor de la opción de configuración %old_sound_rabbit_mq.consumer.class%
. No hay diferencia entre el modo normal y el sandbox. Se recomienda encarecidamente utilizar ConsumerInterface
al escribir argumentos de sugerencia para la inyección del cliente.
Aquí hay un ejemplo de devolución de llamada:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Como puede ver, esto es tan simple como implementar un método: ConsumerInterface::execute .
Tenga en cuenta que sus devoluciones de llamada deben registrarse como servicios normales de Symfony. Allí puede inyectar el contenedor de servicios, el servicio de base de datos, el registrador de Symfony, etc.
Consulte https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md para obtener más detalles sobre lo que forma parte de una instancia de mensaje.
Para detener al consumidor, la devolución de llamada puede generar StopConsumerException
(el último mensaje consumido no será un reconocimiento) o AckStopConsumerException
(el mensaje será un reconocimiento). Si utiliza demonizado, por ejemplo: supervisor, el consumidor realmente reiniciará.
Esto parece ser mucho trabajo solo para enviar mensajes, recapitulemos para tener una mejor descripción general. Esto es lo que necesitamos para producir/consumir mensajes:
¡Y eso es todo!
Este era un requisito para tener una trazabilidad de los mensajes recibidos/publicados. Para habilitar esto, deberá agregar la configuración enable_logger
a los consumidores o editores.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Si lo desea, también puede tratar el registro de colas con diferentes controladores en monolog, haciendo referencia al canal phpamqplib
.
Hasta ahora sólo hemos enviado mensajes a los consumidores, pero ¿qué pasa si queremos obtener una respuesta de ellos? Para lograr esto tenemos que implementar llamadas RPC en nuestra aplicación. Este paquete hace que sea bastante fácil lograr este tipo de cosas con Symfony.
Agreguemos un cliente y un servidor RPC a la configuración:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Para obtener una referencia de configuración completa, utilice el comando php app/console config:dump-reference old_sound_rabbit_mq
.
Aquí tenemos un servidor muy útil: devuelve números enteros aleatorios a sus clientes. La devolución de llamada utilizada para procesar la solicitud será el servicio random_int_server . Ahora veamos cómo invocarlo desde nuestros controladores.
Primero tenemos que iniciar el servidor desde la línea de comando:
$ ./app/console_dev rabbitmq:rpc-server random_int
Y luego agregue el siguiente código a nuestro controlador:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Como puede ver allí, si nuestra identificación de cliente es integer_store , entonces el nombre del servicio será old_sound_rabbit_mq.integer_store_rpc . Una vez que obtenemos ese objeto, realizamos una solicitud en el servidor llamando addRequest
que espera tres parámetros:
Los argumentos que enviamos son los valores mínimo y máximo para la función rand()
. Los enviamos serializando una matriz. Si nuestro servidor espera información JSON o XML, enviaremos dichos datos aquí.
La pieza final es obtener la respuesta. Nuestro script PHP se bloqueará hasta que el servidor devuelva un valor. La variable $respuestas será una matriz asociativa donde cada respuesta del servidor estará contenida en la clave request_id respectiva.
De forma predeterminada, el cliente RPC espera que la respuesta sea serializada. Si el servidor con el que está trabajando devuelve un resultado no serializado, establezca la opción expect_serialized_response
del cliente RPC en falso. Por ejemplo, si el servidor integer_store no serializó el resultado, el cliente se configuraría de la siguiente manera:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
También puede establecer una caducidad para la solicitud en milisegundos, después de lo cual el servidor ya no manejará el mensaje y la solicitud del cliente simplemente expirará. La configuración de la caducidad de los mensajes solo funciona para RabbitMQ 3.xy versiones posteriores. Visite http://www.rabbitmq.com/ttl.html#per-message-ttl para obtener más información.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Como puedes adivinar, también podemos realizar llamadas RPC paralelas .
Digamos que para renderizar una página web, necesita realizar dos consultas a la base de datos, una que tarda 5 segundos en completarse y la otra que tarda 2 segundos (consultas muy costosas). Si los ejecuta de forma secuencial, su página estará lista para entregarse en aproximadamente 7 segundos. Si los ejecuta en paralelo, su página se publicará en aproximadamente 5 segundos. Con RabbitMqBundle
podemos realizar este tipo de llamadas paralelas con facilidad. Definamos un cliente paralelo en la configuración y otro servidor RPC:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Entonces este código debería ir en nuestro controlador:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
Es muy similar al ejemplo anterior, solo que tenemos una llamada addRequest
adicional. También proporcionamos identificadores de solicitud significativos para que luego nos resulte más fácil encontrar la respuesta que queremos en la matriz $respuestas .
Para habilitar la respuesta directa a los clientes, solo tiene que habilitar la opción direct_reply_to en la configuración de rpc_clients para el cliente.
Esta opción utilizará la pseudocola amq.rabbitmq.reply-to al realizar llamadas RPC. En el servidor RPC no es necesaria ninguna modificación.
RabbitMQ tiene una implementación de cola prioritaria en el núcleo a partir de la versión 3.5.0. Cualquier cola se puede convertir en una cola prioritaria utilizando argumentos opcionales proporcionados por el cliente (pero, a diferencia de otras funciones que utilizan argumentos opcionales, no políticas). La implementación admite un número limitado de prioridades: 255. Se recomiendan valores entre 1 y 10. Consultar documentación
así es como puedes declarar una cola prioritaria
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
Si existe una cola upload-picture
antes, debe eliminar esta cola antes de ejecutar el comando rabbitmq:setup-fabric
Ahora digamos que quieres hacer un mensaje con alta prioridad, tienes que publicar el mensaje con esta información adicional.
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
Es una buena práctica tener muchas colas para la separación lógica. Con un consumidor simple, tendrá que crear un trabajador (consumidor) por cola y puede ser difícil de administrar cuando se trata de muchas evoluciones (¿olvidó agregar una línea en su configuración de supervisor?). Esto también es útil para colas pequeñas, ya que es posible que no desee tener tantos trabajadores como colas y desee reagrupar algunas tareas sin perder flexibilidad y principio de separación.
Varios consumidores le permiten manejar este caso de uso escuchando varias colas en el mismo consumidor.
Así es como puede configurar un consumidor con múltiples colas:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
La devolución de llamada ahora se especifica en cada cola y debe implementar ConsumerInterface
como un simple consumidor. Todas las opciones de queues-options
en el consumidor están disponibles para cada cola.
Tenga en cuenta que todas las colas están bajo el mismo intercambio; depende de usted establecer la ruta correcta para las devoluciones de llamada.
queues_provider
es un servicio opcional que proporciona colas dinámicamente. Debe implementar QueuesProviderInterface
.
Tenga en cuenta que los proveedores de colas son responsables de las llamadas adecuadas a setDequeuer
y que las devoluciones de llamada son invocables (no ConsumerInterface
). En caso de que el servicio que proporciona colas implemente DequeuerAwareInterface
, se agrega una llamada a setDequeuer
a la definición del servicio con DequeuerInterface
siendo actualmente un MultipleConsumer
.
Es posible que su aplicación tenga un flujo de trabajo complejo y necesite un enlace arbitrario. Los escenarios de vinculación arbitraria pueden incluir vinculaciones de intercambio a intercambio a través de la propiedad destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
El comando Rabbitmq:setup-fabric declarará intercambios y colas tal como se definen en sus configuraciones de productor, consumidor y multiconsumidor antes de crear sus enlaces arbitrarios. Sin embargo, Rabbitmq:setup-fabric NO declarará colas adicionales ni intercambios definidos en los enlaces. Depende de usted asegurarse de que se declaren los intercambios/colas.
A veces es necesario cambiar la configuración del consumidor sobre la marcha. Los consumidores dinámicos le permiten definir las opciones de la cola de consumidores mediante programación, según el contexto.
por ejemplo, en un escenario en el que el consumidor definido debe ser responsable de una cantidad dinámica de temas y no desea (o no puede) cambiar su configuración cada vez.
Defina un servicio queue_options_provider
que implemente QueueOptionsProviderInterface
y agréguelo a su configuración dynamic_consumers
.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Uso de ejemplo:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
En este caso, el consumidor proc_logs
se ejecuta para server1
y puede decidir las opciones de cola que utiliza.
Ahora bien, ¿por qué necesitaremos consumidores anónimos? Esto suena como una amenaza en Internet o algo así... Sigue leyendo.
En AMQP hay un tipo de intercambio llamado tema donde los mensajes se enrutan a colas según –adivinas– el tema del mensaje. Podemos enviar registros sobre nuestra aplicación a un intercambio de temas de RabbiMQ utilizando como tema el nombre de host donde se creó el registro y la gravedad de dicho registro. El cuerpo del mensaje será el contenido del registro y nuestras claves de enrutamiento serán así:
Como no queremos llenar colas con registros ilimitados, lo que podemos hacer es que cuando queramos monitorear el sistema, podemos iniciar un consumidor que crea una cola y la adjunta al intercambio de registros según algún tema, por ejemplo. , nos gustaría ver todos los errores reportados por nuestros servidores. La clave de enrutamiento será algo así como: #.error . En tal caso, tenemos que crear un nombre de cola, vincularlo al intercambio, obtener los registros, desvincularlo y eliminar la cola. Afortunadamente, AMPQ proporciona una manera de hacer esto automáticamente si proporciona las opciones correctas cuando declara y vincula la cola. El problema es que no quieres recordar todas esas opciones. Por tal motivo implementamos el patrón Consumidor Anónimo .
Cuando iniciamos un Consumidor Anónimo, este se encargará de dichos detalles y solo tenemos que pensar en implementar el callback para cuando lleguen los mensajes. ¿Se llama Anónimo porque no especificará un nombre de cola, pero esperará a que RabbitMQ le asigne uno aleatorio?
Ahora bien, ¿cómo configurar y ejecutar dicho consumidor?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Allí especificamos el nombre del intercambio y su tipo junto con la devolución de llamada que se debe ejecutar cuando llega un mensaje.
Este Consumidor Anónimo ahora puede escuchar a los Productores, que están vinculados al mismo intercambio y del tipo tema :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Para iniciar un Consumidor Anónimo usamos el siguiente comando:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
La única opción nueva respecto a los comandos que hemos visto antes es la que especifica la clave de enrutamiento : -r '#.error'
.
En algunos casos, querrás obtener un lote de mensajes y luego procesarlos todos. Los consumidores por lotes le permitirán definir la lógica para este tipo de procesamiento.
Por ejemplo: imagina que tienes una cola donde recibes un mensaje para insertar información en la base de datos y te das cuenta de que si haces una inserción por lotes, es mucho mejor que insertar uno por uno.
Defina un servicio de devolución de llamada que implemente BatchConsumerInterface
y agregue la definición del consumidor a su configuración.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Nota : Si la opción keep_alive
está configurada en true
, se ignorará idle_timeout_exit_code
y el proceso del consumidor continúa.
Puede implementar un consumidor por lotes que reconocerá todos los mensajes en una sola devolución o puede tener control sobre qué mensaje reconocer.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
Cómo ejecutar el siguiente consumidor por lotes:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Importante: BatchConsumers no tendrá la opción -m|messages
disponible Importante: BatchConsumers también puede tener la opción -b|batches
disponible si solo desea consumir una cantidad específica de lotes y luego detener al consumidor. Proporcione el número de lotes solo si desea que el consumidor se detenga después de que se hayan consumido esos mensajes por lotes.
Hay un comando que lee datos de STDIN y los publica en una cola RabbitMQ. Para usarlo primero tienes que configurar un servicio producer
en tu archivo de configuración como este:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Ese productor publicará mensajes con las words
intercambio directo. Por supuesto puedes adaptar la configuración a lo que quieras.
Luego digamos que desea publicar el contenido de algunos archivos XML para que sean procesados por una granja de consumidores. Podrías publicarlos simplemente usando un comando como este:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Esto significa que puedes componer productores con comandos simples de Unix.
Descompongamos esa línea:
$ find vendor/symfony/ -name " *.xml " -print0
Ese comando encontrará todos los archivos .xml
dentro de la carpeta Symfony e imprimirá el nombre del archivo. Luego, cada uno de esos nombres de archivos se canaliza a cat
a través de xargs
:
$ xargs -0 cat
Y finalmente la salida de cat
va directamente a nuestro productor que se invoca así:
$ ./app/console rabbitmq:stdin-producer words
Solo se necesita un argumento, que es el nombre del productor tal como lo configuró en su archivo config.yml
.
El propósito de este paquete es permitir que su aplicación produzca mensajes y los publique en algunos intercambios que haya configurado.
En algunos casos, e incluso si su configuración es correcta, los mensajes que está generando no se enrutarán a ninguna cola porque no existe ninguna. El consumidor responsable del consumo de la cola debe ejecutarse para que se cree la cola.
Lanzar un comando para cada consumidor puede ser una pesadilla cuando el número de consumidores es elevado.
Para crear intercambios, colas y enlaces a la vez y asegurarse de no perder ningún mensaje, puede ejecutar el siguiente comando:
$ ./app/console rabbitmq:setup-fabric
Cuando lo desee, puede configurar sus consumidores y productores para que asuman que el tejido RabbitMQ ya está definido. Para hacer esto, agregue lo siguiente a su configuración:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
De forma predeterminada, un consumidor o productor declarará todo lo que necesita con RabbitMQ cuando comience. Tenga cuidado al usar esto, cuando los intercambios o las colas no estén definidas, habrá errores. Cuando haya cambiado cualquier configuración, deberá ejecutar el comando setup-fabric anterior para declarar su configuración.
Para contribuir simplemente abre un Pull Request con tu nuevo código teniendo en cuenta que si agregas nuevas características o modificas las existentes tienes que documentar en este README lo que hacen. Si rompes BC, entonces también debes documentarlo. También tienes que actualizar el CHANGELOG. Entonces:
Ver: recursos/meta/LICENSE.md
La estructura del paquete y la documentación se basan parcialmente en RedisBundle.