O RabbitMqBundle
incorpora mensagens em seu aplicativo via RabbitMQ usando a biblioteca php-amqplib.
O pacote implementa vários padrões de mensagens conforme visto na biblioteca Thumper. Portanto, publicar mensagens no RabbitMQ a partir de um controlador Symfony é tão fácil quanto:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Mais tarde, quando você quiser consumir 50 mensagens da fila upload_pictures
, basta executar na CLI:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Todos os exemplos esperam um servidor RabbitMQ em execução.
Este pacote foi apresentado na conferência Symfony Live Paris 2011. Veja os slides aqui.
Devido às alterações significativas causadas pelo Symfony >=4.4, uma nova tag foi lançada, tornando o pacote compatível com Symfony >=4.4.
Exija o pacote e suas dependências com o compositor:
$ composer require php-amqplib/rabbitmq-bundle
Registre o pacote:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Aproveitar !
Se você tiver um aplicativo de console usado para executar consumidores RabbitMQ, não precisará do Symfony HttpKernel e do FrameworkBundle. A partir da versão 1.6, você pode usar o componente Injeção de Dependência para carregar a configuração e os serviços desse pacote configurável e, em seguida, usar o comando do consumidor.
Exija o pacote em seu arquivo compositor.json:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Registre a extensão e o passo do compilador:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
Desde 04/06/2012, algumas opções padrão para trocas declaradas na seção de configuração "produtores" foram alteradas para corresponder aos padrões das trocas declaradas na seção "consumidores". As configurações afetadas são:
durable
foi alterado de false
para true
,auto_delete
foi alterado de true
para false
.Sua configuração deverá ser atualizada se você estivesse confiando nos valores padrão anteriores.
Desde 24/04/2012, a assinatura do método ConsumerInterface::execute foi alterada
Desde 03/01/2012, o método de execução do consumidor obtém todo o objeto da mensagem AMQP e não apenas o corpo. Consulte o arquivo CHANGELOG para obter mais detalhes.
Adicione a seção old_sound_rabbit_mq
em seu arquivo de configuração:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Aqui configuramos o serviço de conexão e os endpoints de mensagens que nossa aplicação terá. Neste exemplo, seu contêiner de serviço conterá o serviço old_sound_rabbit_mq.upload_picture_producer
e old_sound_rabbit_mq.upload_picture_consumer
. O último espera que exista um serviço chamado upload_picture_service
.
Se você não especificar uma conexão para o cliente, o cliente procurará uma conexão com o mesmo alias. Portanto, para nosso upload_picture
o contêiner de serviço procurará uma conexão upload_picture
.
Se você precisar adicionar argumentos de fila opcionais, suas opções de fila poderão ser mais ou menos assim:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
outro exemplo com mensagem TTL de 20 segundos:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
O valor do argumento deve ser uma lista de tipos de dados e valores. Os tipos de dados válidos são:
S
- CordaI
- inteiroD
- DecimaisT
- Carimbos de data e horaF
- TabelaA
- Matrizt
-bool Adapte os arguments
de acordo com suas necessidades.
Se você deseja vincular a fila com chaves de roteamento específicas, você pode declará-la na configuração do produtor ou do consumidor:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
Em um ambiente Symfony todos os serviços são totalmente inicializados para cada solicitação, a partir da versão >= 4.3 você pode declarar um serviço como lento (Lazy Services). Este pacote ainda não oferece suporte ao novo recurso Lazy Services, mas você pode definir lazy: true
na configuração de sua conexão para evitar conexões desnecessárias com seu agente de mensagens em cada solicitação. É extremamente recomendado usar conexões lentas por motivos de desempenho, porém a opção lenta está desabilitada por padrão para evitar possíveis quebras em aplicações que já utilizam este pacote.
É uma boa ideia definir read_write_timeout
para 2x a pulsação para que seu soquete fique aberto. Se você não fizer isso ou usar um multiplicador diferente, existe o risco de o soquete do consumidor atingir o tempo limite.
Tenha em mente que você pode esperar problemas se suas tarefas geralmente durarem mais do que o período de pulsação, para o qual não há boas soluções (link). Considere usar um valor grande para a pulsação ou deixe a pulsação desabilitada em favor do keepalive
do tcp (tanto no lado do cliente quanto no servidor) e do recurso graceful_max_execution_timeout
.
Você pode fornecer vários hosts para uma conexão. Isso permitirá que você use o cluster RabbitMQ com vários nós.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Preste atenção que você não pode especificar
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
parâmetros para cada host separadamente.
Às vezes, suas informações de conexão podem precisar ser dinâmicas. Os parâmetros de conexão dinâmica permitem fornecer ou substituir parâmetros programaticamente por meio de um serviço.
por exemplo, em um cenário em que o parâmetro vhost
da conexão depende do locatário atual do seu aplicativo de marca branca e você não deseja (ou não pode) alterar sua configuração todas as vezes.
Defina um serviço em connection_parameters_provider
que implemente ConnectionParametersProviderInterface
e adicione-o à configuração connections
apropriada.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Exemplo de implementação:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
Neste caso, o parâmetro vhost
será substituído pela saída de getVhost()
.
Em um aplicativo de mensagens, o processo que envia mensagens ao intermediário é denominado produtor , enquanto o processo que recebe essas mensagens é denominado consumidor . Em seu aplicativo você terá vários deles que você pode listar em suas respectivas entradas na configuração.
Um produtor será usado para enviar mensagens ao servidor. No Modelo AMQP as mensagens são enviadas para uma exchange , isso significa que na configuração de um produtor você terá que especificar as opções de conexão junto com as opções de exchange, que normalmente será o nome da exchange e o tipo dela.
Agora digamos que você deseja processar uploads de imagens em segundo plano. Após mover a imagem para seu local final, você publicará uma mensagem no servidor com as seguintes informações:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Como você pode ver, se em sua configuração você tiver um produtor chamado upload_picture , então no contêiner de serviço você terá um serviço chamado old_sound_rabbit_mq.upload_picture_producer .
Além da mensagem em si, o método OldSoundRabbitMqBundleRabbitMqProducer#publish()
também aceita um parâmetro de chave de roteamento opcional e uma matriz opcional de propriedades adicionais. A matriz de propriedades adicionais permite alterar as propriedades com as quais um objeto PhpAmqpLibMessageAMQPMessage
é construído por padrão. Dessa forma, por exemplo, você pode alterar os cabeçalhos do aplicativo.
Você pode usar os métodos setContentType e setDeliveryMode para definir o tipo de conteúdo da mensagem e o modo de entrega da mensagem respectivamente, substituindo qualquer padrão definido na seção de configuração "produtores". Se não for substituído pela configuração de "produtores" ou por uma chamada explícita para esses métodos (conforme o exemplo abaixo), os valores padrão serão text/plain para tipo de conteúdo e 2 para modo de entrega.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Se você precisar usar uma classe personalizada para um produtor (que deve herdar de OldSoundRabbitMqBundleRabbitMqProducer
), você pode usar a opção de class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
A próxima peça do quebra-cabeça é ter um consumidor que retire a mensagem da fila e a processe adequadamente.
Atualmente existem dois eventos emitidos pela produtora.
Este evento ocorre imediatamente antes da publicação da mensagem. Este é um bom gancho para fazer qualquer registro final, validação, etc. antes de realmente enviar a mensagem. Um exemplo de implementação de um ouvinte:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Este evento ocorre imediatamente após a publicação da mensagem. Este é um bom gancho para fazer qualquer registro de confirmação, commits, etc. depois de realmente enviar a mensagem. Um exemplo de implementação de um ouvinte:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Um consumidor se conectará ao servidor e iniciará um loop aguardando o processamento das mensagens recebidas. Dependendo do retorno de chamada especificado para tal consumidor será o comportamento que ele terá. Vamos revisar a configuração do consumidor acima:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Como vemos lá, a opção de retorno de chamada tem uma referência a upload_picture_service . Quando o consumidor receber uma mensagem do servidor ele executará tal retorno de chamada. Se, para fins de teste ou depuração, você precisar especificar um retorno de chamada diferente, poderá alterá-lo lá.
Além do retorno de chamada, também especificamos a conexão a ser usada, da mesma forma que fazemos com um produtor . As opções restantes são exchange_options e queue_options . As exchange_options devem ser as mesmas usadas para o produtor . Em queue_options forneceremos um nome de fila . Por que?
Como dissemos, as mensagens no AMQP são publicadas em uma exchange . Isso não significa que a mensagem chegou a uma fila . Para que isso aconteça, primeiro precisamos criar essa fila e depois vinculá-la à exchange . O legal disso é que você pode vincular várias filas a uma exchange , dessa forma uma mensagem pode chegar a vários destinos. A vantagem desta abordagem é a dissociação entre o produtor e o consumidor. O produtor não se importa com quantos consumidores processarão suas mensagens. Basta que sua mensagem chegue ao servidor. Desta forma podemos ampliar as ações que realizamos cada vez que uma imagem é carregada sem a necessidade de alterar o código em nosso controlador.
Agora, como administrar um consumidor? Existe um comando para isso que pode ser executado assim:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
O que isto significa? Estamos executando o consumidor upload_picture informando-o para consumir apenas 50 mensagens. Cada vez que o consumidor receber uma mensagem do servidor, ele executará o callback configurado passando a mensagem AMQP como uma instância da classe PhpAmqpLibMessageAMQPMessage
. O corpo da mensagem pode ser obtido chamando $msg->body
. Por padrão, o consumidor processará mensagens em um loop infinito para alguma definição de infinito .
Se quiser ter certeza de que o consumidor terminará a execução instantaneamente no sinal Unix, você pode executar o comando com o sinalizador -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
Então o consumidor terminará a execução instantaneamente.
Para usar o comando com este sinalizador você precisa instalar o PHP com extensão PCNTL.
Se quiser estabelecer um limite de memória para o consumidor, você pode fazer isso usando o sinalizador -l
. No exemplo a seguir, esse sinalizador adiciona um limite de memória de 256 MB. O consumidor será interrompido cinco MB antes de atingir 256 MB para evitar um erro de tamanho de memória permitido pelo PHP.
$ ./app/console rabbitmq:consumer -l 256
Se quiser remover todas as mensagens que aguardam em uma fila, você pode executar este comando para limpar esta fila:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Para excluir a fila do consumidor, use este comando:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Isso pode ser útil em muitos cenários. Existem 3 eventos AMQPE:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` e verificar a implantação de um novo aplicativo.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Evento gerado antes de processar um AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Evento gerado após o processamento de um AMQPMessage
. Se a mensagem do processo lançar uma exceção, o evento não será gerado.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Evento gerado quando o método wait
sai por tempo limite sem receber uma mensagem. Para utilizar este evento é necessário configurar um consumidor idle_timeout
. Por padrão, a saída do processo no tempo limite de inatividade, você pode evitá-lo definindo $event->setForceStop(false)
em um ouvinte.
Se precisar definir um tempo limite quando não houver mensagens da sua fila durante um período de tempo, você poderá definir idle_timeout
em segundos. idle_timeout_exit_code
especifica qual código de saída deve ser retornado pelo consumidor quando ocorrer o tempo limite de inatividade. Sem especificá-lo, o consumidor lançará uma exceção PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Defina o timeout_wait
em segundos. O timeout_wait
especifica quanto tempo o consumidor esperará sem receber uma nova mensagem antes de garantir que a conexão atual ainda seja válida.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Se você quiser que seu consumidor esteja funcionando até um certo tempo e depois saia normalmente, defina graceful_max_execution.timeout
em segundos. "Sair normalmente" significa que o consumidor sairá após a tarefa atualmente em execução ou imediatamente, ao aguardar novas tarefas. O graceful_max_execution.exit_code
especifica qual código de saída deve ser retornado pelo consumidor quando ocorrer o tempo limite máximo de execução do Graceful. Sem especificá-lo, o consumidor sairá com status 0
.
Esse recurso é ótimo em conjunto com o supervisord, que juntos podem permitir a limpeza periódica de vazamentos de memória, conexão com renovação de banco de dados/rabbitmq e muito mais.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Você deve ter notado que o despacho ainda não funciona exatamente como desejamos. Por exemplo, numa situação com dois trabalhadores, quando todas as mensagens ímpares são pesadas e as mensagens pares são leves, um trabalhador estará constantemente ocupado e o outro dificilmente realizará qualquer trabalho. Bem, o RabbitMQ não sabe nada sobre isso e ainda enviará mensagens uniformemente.
Isso acontece porque o RabbitMQ apenas despacha uma mensagem quando ela entra na fila. Ele não analisa o número de mensagens não confirmadas de um consumidor. Ele apenas despacha cegamente cada enésima mensagem para o n-ésimo consumidor.
Para derrotar isso, podemos usar o método basic.qos com a configuração prefetch_count=1. Isso diz ao RabbitMQ para não fornecer mais de uma mensagem a um trabalhador por vez. Ou, em outras palavras, não envie uma nova mensagem para um trabalhador até que ele tenha processado e confirmado a anterior. Em vez disso, ele irá despachá-lo para o próximo trabalhador que ainda não esteja ocupado.
De: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Tenha cuidado, pois a implementação do despacho justo introduz uma latência que prejudicará o desempenho (veja esta postagem do blog). Mas implementá-lo permite escalar horizontalmente de forma dinâmica à medida que a fila aumenta. Você deve avaliar, como recomenda a postagem do blog, o valor correto de prefetch_size de acordo com o tempo necessário para processar cada mensagem e o desempenho da sua rede.
Com RabbitMqBundle, você pode configurar qos_options por consumidor assim:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Se usado com Symfony 4.2+, o pacote declara no contêiner um conjunto de aliases para produtores e consumidores regulares. Eles são usados para ligação automática de argumentos com base no tipo declarado e no nome do argumento. Isso permite que você altere o exemplo anterior do produtor para:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
O nome do argumento é construído a partir do nome do produtor ou consumidor da configuração e sufixado com a palavra do produtor ou consumidor de acordo com o tipo. Em contraste com os itens do contêiner, o sufixo da palavra da convenção de nomenclatura (produtor ou consumidor) não será duplicado se o nome já estiver com sufixo. A chave do produtor upload_picture
será alterada para o nome do argumento $uploadPictureProducer
. A chave do produtor upload_picture_producer
também teria o alias do nome do argumento $uploadPictureProducer
. É melhor evitar nomes semelhantes dessa maneira.
Todos os produtores têm alias para OldSoundRabbitMqBundleRabbitMqProducerInterface
e opção de classe de produtor na configuração. No modo sandbox, apenas aliases ProducerInterface
são criados. É altamente recomendado usar a classe ProducerInterface
ao digitar argumentos de sugestão para injeção do produtor.
Todos os consumidores têm alias para OldSoundRabbitMqBundleRabbitMqConsumerInterface
e valor da opção de configuração %old_sound_rabbit_mq.consumer.class%
. Não há diferença entre o modo normal e o modo sandbox. É altamente recomendado usar ConsumerInterface
ao digitar argumentos de sugestão para injeção de cliente.
Aqui está um exemplo de retorno de chamada:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Como você pode ver, isso é tão simples quanto implementar um método: ConsumerInterface::execute .
Tenha em mente que seus retornos de chamada precisam ser registrados como serviços normais do Symfony. Lá você pode injetar o contêiner de serviço, o serviço de banco de dados, o criador de logs do Symfony e assim por diante.
Consulte https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md para obter mais detalhes sobre o que faz parte de uma instância de mensagem.
Para parar o consumidor, o retorno de chamada pode lançar StopConsumerException
(a última mensagem consumida não será confirmada) ou AckStopConsumerException
(a mensagem será confirmada). Se usar demonizado, ex: supervisor, o consumidor irá realmente reiniciar.
Parece dar muito trabalho apenas enviar mensagens, vamos recapitular para ter uma visão geral melhor. Isto é o que precisamos para produzir/consumir mensagens:
E é isso!
Este era um requisito para ter rastreabilidade das mensagens recebidas/publicadas. Para habilitar isso, você precisará adicionar a configuração enable_logger
aos consumidores ou editores.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Se desejar, você também pode tratar o log de filas com diferentes manipuladores no monolog, referenciando o canal phpamqplib
.
Até agora apenas enviamos mensagens aos consumidores, mas e se quisermos obter uma resposta deles? Para conseguir isso, temos que implementar chamadas RPC em nosso aplicativo. Este pacote torna muito fácil conseguir essas coisas com o Symfony.
Vamos adicionar um cliente e servidor RPC à configuração:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Para obter uma referência completa de configuração, use o comando php app/console config:dump-reference old_sound_rabbit_mq
.
Aqui temos um servidor muito útil: ele retorna números inteiros aleatórios aos seus clientes. O retorno de chamada usado para processar a solicitação será o serviço random_int_server . Agora vamos ver como invocá-lo em nossos controladores.
Primeiro temos que iniciar o servidor a partir da linha de comando:
$ ./app/console_dev rabbitmq:rpc-server random_int
E então adicione o seguinte código ao nosso controlador:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Como você pode ver aí, se nosso ID de cliente for integer_store , então o nome do serviço será old_sound_rabbit_mq.integer_store_rpc . Assim que obtivermos esse objeto, colocamos uma solicitação no servidor chamando addRequest
que espera três parâmetros:
Os argumentos que estamos enviando são os valores mínimo e máximo da função rand()
. Nós os enviamos serializando um array. Se nosso servidor espera informações JSON, ou XML, enviaremos esses dados aqui.
A peça final é obter a resposta. Nosso script PHP será bloqueado até que o servidor retorne um valor. A variável $replies será um array associativo onde cada resposta do servidor estará contida na respectiva chave request_id .
Por padrão, o Cliente RPC espera que a resposta seja serializada. Se o servidor com o qual você está trabalhando retornar um resultado não serializado, defina a opção expect_serialized_response
do cliente RPC como false. Por exemplo, se o servidor integer_store não serializasse o resultado, o cliente seria definido conforme abaixo:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
Você também pode definir uma expiração para a solicitação em milissegundos, após a qual a mensagem não será mais tratada pelo servidor e a solicitação do cliente simplesmente expirará. Definir a expiração para mensagens funciona apenas para RabbitMQ 3.xe superior. Visite http://www.rabbitmq.com/ttl.html#per-message-ttl para obter mais informações.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Como você pode imaginar, também podemos fazer chamadas RPC paralelas .
Digamos que para renderizar alguma página web, você precise realizar duas consultas ao banco de dados, uma demorando 5 segundos para ser concluída e a outra demorando 2 segundos –consultas muito caras–. Se você executá-los sequencialmente, sua página estará pronta para entrega em cerca de 7 segundos. Se você executá-los em paralelo, sua página será veiculada em cerca de 5 segundos. Com RabbitMqBundle
podemos fazer essas chamadas paralelas com facilidade. Vamos definir um cliente paralelo na configuração e outro servidor RPC:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Então este código deve ir para nosso controlador:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
É muito semelhante ao exemplo anterior, apenas temos uma chamada addRequest
extra. Também fornecemos identificadores de solicitação significativos para que mais tarde seja mais fácil encontrar a resposta que desejamos no array $replies .
Para habilitar clientes de resposta direta basta habilitar a opção direct_reply_to na configuração rpc_clients do cliente.
Esta opção usará a pseudo-fila amq.rabbitmq.reply-to ao fazer chamadas RPC. No servidor RPC não há necessidade de modificação.
RabbitMQ tem implementação de fila prioritária no núcleo a partir da versão 3.5.0. Qualquer fila pode ser transformada em prioritária usando argumentos opcionais fornecidos pelo cliente (mas, ao contrário de outros recursos que usam argumentos opcionais, não políticas). A implementação apoia um número limitado de prioridades: 255. São recomendados valores entre 1 e 10. Verifique a documentação
aqui está como você pode declarar uma fila prioritária
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
se a fila upload-picture
existir antes, você deve excluir esta fila antes de executar o comando rabbitmq:setup-fabric
Agora digamos que você queira fazer uma mensagem com alta prioridade, você tem que publicar a mensagem com essas informações adicionais
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
É uma boa prática ter muitas filas para separação lógica. Com um consumidor simples você terá que criar um trabalhador (consumidor) por fila e pode ser difícil de gerenciar ao lidar com muitas evoluções (esqueceu de adicionar uma linha na sua configuração de supervisão?). Isso também é útil para filas pequenas, pois você pode não querer ter tantos trabalhadores quanto filas e querer reagrupar algumas tarefas sem perder a flexibilidade e o princípio de separação.
Vários consumidores permitem que você lide com esse caso de uso ouvindo diversas filas no mesmo consumidor.
Veja como você pode definir um consumidor com múltiplas filas:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
O retorno de chamada agora é especificado em cada fila e deve implementar ConsumerInterface
como um consumidor simples. Todas as opções de queues-options
no consumidor estão disponíveis para cada fila.
Esteja ciente de que todas as filas estão na mesma central, cabe a você definir o roteamento correto para retornos de chamada.
O queues_provider
é um serviço opcional que fornece filas dinamicamente. Deve implementar QueuesProviderInterface
.
Esteja ciente de que os provedores de filas são responsáveis pelas chamadas adequadas para setDequeuer
e que os retornos de chamada podem ser chamados (não ConsumerInterface
). Caso as filas de fornecimento de serviço implementem DequeuerAwareInterface
, uma chamada para setDequeuer
é adicionada à definição do serviço com um DequeuerInterface
atualmente sendo um MultipleConsumer
.
Você pode descobrir que seu aplicativo tem um fluxo de trabalho complexo e precisa de uma ligação arbitrária. Cenários de ligação arbitrária podem incluir troca para troca de ligações por meio da propriedade destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
O comando Rabbitmq:setup-fabric declarará trocas e filas conforme definido em suas configurações de produtor, consumidor e multiconsumidor antes de criar suas ligações arbitrárias. No entanto, o RabbitMQ:setup-fabric NÃO declarará filas de adição e trocas definidas nas ligações. Cabe a você garantir que as trocas/filas sejam declaradas.
Às vezes você precisa alterar a configuração do consumidor na hora. Os consumidores dinâmicos permitem definir as opções de fila dos consumidores de forma programática, com base no contexto.
por exemplo, em um cenário em que o consumidor definido deve ser responsável por um número dinâmico de tópicos e você não deseja (ou não pode) alterar sua configuração todas as vezes.
Defina um serviço queue_options_provider
que implemente QueueOptionsProviderInterface
e adicione-o à configuração dynamic_consumers
.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Exemplo de uso:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
Nesse caso, o consumidor proc_logs
é executado para server1
e pode decidir sobre as opções de fila que utiliza.
Agora, por que precisaremos de consumidores anônimos? Isso parece alguma ameaça da Internet ou algo assim… Continue lendo.
No AMQP existe um tipo de troca chamada tópico onde as mensagens são roteadas para filas com base –você adivinha– no tópico da mensagem. Podemos enviar logs sobre nossa aplicação para uma troca de tópicos RabbiMQ usando como tópico o nome do host onde o log foi criado e a gravidade de tal log. O corpo da mensagem será o conteúdo do log e nossas chaves de roteamento serão assim:
Como não queremos ficar enchendo filas com logs ilimitados o que podemos fazer é que quando quisermos monitorar o sistema, podemos lançar um consumidor que cria uma fila e anexa a troca de logs com base em algum tópico, por exemplo , gostaríamos de ver todos os erros reportados pelos nossos servidores. A chave de roteamento será algo como: #.error . Nesse caso, temos que criar um nome de fila, vinculá-lo à exchange, obter os logs, desvinculá-lo e excluir a fila. Felizmente, o AMPQ oferece uma maneira de fazer isso automaticamente se você fornecer as opções corretas ao declarar e vincular a fila. O problema é que você não quer se lembrar de todas essas opções. Por isso implementamos o padrão Consumidor Anônimo .
Quando iniciamos um Consumidor Anônimo, ele cuidará desses detalhes e só nos resta pensar em implementar o callback para quando as mensagens chegarem. É chamado de Anônimo porque não especifica um nome de fila, mas espera que o RabbitMQ atribua um nome aleatório a ela.
Agora, como configurar e rodar tal consumidor?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Lá especificamos o nome e o tipo da exchange junto com o callback que deve ser executado quando uma mensagem chegar.
Este Consumidor Anônimo agora pode ouvir Produtores, que estão vinculados à mesma exchange e do tipo topic :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Para iniciar um Consumidor Anônimo usamos o seguinte comando:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
A única opção nova em comparação com os comandos que vimos antes é aquela que especifica a chave de roteamento : -r '#.error'
.
Em alguns casos, você desejará receber um lote de mensagens e, em seguida, processar todas elas. Os consumidores em lote permitirão definir a lógica para esse tipo de processamento.
ex.: Imagine que você tem uma fila onde recebe uma mensagem para inserção de alguma informação no banco de dados, e você percebe que se fizer uma inserção em lote é muito melhor do que inserir uma por uma.
Defina um serviço de retorno de chamada que implemente BatchConsumerInterface
e adicione a definição do consumidor à sua configuração.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Nota : Se a opção keep_alive
estiver definida como true
, idle_timeout_exit_code
será ignorado e o processo do consumidor continuará.
Você pode implementar um consumidor em lote que reconhecerá todas as mensagens em um retorno ou poderá ter controle sobre qual mensagem confirmar.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
Como executar o seguinte consumidor em lote:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Importante: BatchConsumers não terá a opção -m|messages
disponível. Importante: BatchConsumers também poderá ter a opção -b|batches
disponível se desejar consumir apenas um número específico de lotes e depois parar o consumidor. Forneça o número de lotes apenas se desejar que o consumidor pare após o consumo dessas mensagens em lote!
Existe um comando que lê dados do STDIN e os publica em uma fila RabbitMQ. Para usá-lo primeiro você deve configurar um serviço producer
em seu arquivo de configuração assim:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Esse produtor publicará mensagens nas words
troca direta. Claro que você pode adaptar a configuração como quiser.
Então, digamos que você queira publicar o conteúdo de alguns arquivos XML para que sejam processados por um farm de consumidores. Você poderia publicá-los usando apenas um comando como este:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Isso significa que você pode compor produtores com comandos Unix simples.
Vamos decompor esse forro:
$ find vendor/symfony/ -name " *.xml " -print0
Esse comando encontrará todos os arquivos .xml
dentro da pasta symfony e imprimirá o nome do arquivo. Cada um desses nomes de arquivo é então canalizado para cat
via xargs
:
$ xargs -0 cat
E finalmente a saída de cat
vai diretamente para o nosso produtor que é invocado assim:
$ ./app/console rabbitmq:stdin-producer words
Leva apenas um argumento que é o nome do produtor conforme você o configurou em seu arquivo config.yml
.
O objetivo deste pacote é permitir que seu aplicativo produza mensagens e publique-as em algumas trocas que você configurou.
Em alguns casos e mesmo que sua configuração esteja correta, as mensagens que você está produzindo não serão roteadas para nenhuma fila porque não existe nenhuma. O consumidor responsável pelo consumo da fila deve ser executado para que a fila seja criada.
Lançar um comando para cada consumidor pode ser um pesadelo quando o número de consumidores é alto.
Para criar trocas, filas e ligações de uma só vez e ter certeza de não perder nenhuma mensagem, você pode executar o seguinte comando:
$ ./app/console rabbitmq:setup-fabric
Quando desejar, você pode configurar seus consumidores e produtores para assumir que a malha RabbitMQ já está definida. Para fazer isso, adicione o seguinte à sua configuração:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
Por padrão, um consumidor ou produtor irá declarar tudo o que precisa com o RabbitMQ quando ele for iniciado. Tenha cuidado ao usar isso, quando trocas ou filas não estiverem definidas, haverá erros. Ao alterar qualquer configuração, você precisa executar o comando setup-fabric acima para declarar sua configuração.
Para contribuir basta abrir um Pull Request com seu novo código levando em consideração que se você adicionar novos recursos ou modificar os existentes deverá documentar neste README o que eles fazem. Se você quebrar o BC, também deverá documentá-lo. Além disso, você deve atualizar o CHANGELOG. Então:
Veja: recursos/meta/LICENSE.md
A estrutura do pacote e a documentação são parcialmente baseadas no RedisBundle