Le RabbitMqBundle
intègre la messagerie dans votre application via RabbitMQ en utilisant la bibliothèque php-amqplib.
Le bundle implémente plusieurs modèles de messagerie comme on le voit sur la bibliothèque Thumper. Par conséquent, publier des messages sur RabbitMQ à partir d’un contrôleur Symfony est aussi simple que :
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Plus tard, lorsque vous souhaitez consommer 50 messages de la file d'attente upload_pictures
, vous exécutez simplement sur la CLI :
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Tous les exemples attendent un serveur RabbitMQ en cours d'exécution.
Ce bundle a été présenté lors de la conférence Symfony Live Paris 2011. Voir les diapositives ici.
En raison des changements radicaux causés par Symfony >=4.4, une nouvelle balise a été publiée, rendant le bundle compatible avec Symfony >=4.4.
Exiger le bundle et ses dépendances avec composer :
$ composer require php-amqplib/rabbitmq-bundle
Enregistrez le forfait :
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Apprécier !
Si vous disposez d'une application console utilisée pour exécuter les consommateurs RabbitMQ, vous n'avez pas besoin de Symfony HttpKernel et FrameworkBundle. À partir de la version 1.6, vous pouvez utiliser le composant Dependency Injection pour charger cette configuration et ces services de bundle, puis utiliser la commande consumer.
Exigez le bundle dans votre fichier composer.json :
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Enregistrez l'extension et le code du compilateur :
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
Depuis le 04/06/2012 Certaines options par défaut des échanges déclarés dans la section de configuration « producteurs » ont été modifiées pour correspondre aux valeurs par défaut des échanges déclarés dans la section « consommateurs ». Les paramètres concernés sont :
durable
est passé de false
à true
,auto_delete
est passé de true
à false
.Votre configuration doit être mise à jour si vous comptiez sur les valeurs par défaut précédentes.
Depuis le 24/04/2012, la signature de la méthode ConsumerInterface::execute a changé
Depuis le 03/01/2012, la méthode d'exécution des consommateurs obtient l'intégralité de l'objet de message AMQP et pas seulement le corps. Voir le fichier CHANGELOG pour plus de détails.
Ajoutez la section old_sound_rabbit_mq
dans votre fichier de configuration :
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Ici, nous configurons le service de connexion et les points de terminaison de message que notre application aura. Dans cet exemple, votre conteneur de services contiendra le service old_sound_rabbit_mq.upload_picture_producer
et old_sound_rabbit_mq.upload_picture_consumer
. Ce dernier s'attend à ce qu'il existe un service appelé upload_picture_service
.
Si vous ne spécifiez pas de connexion pour le client, le client recherchera une connexion avec le même alias. Ainsi, pour notre upload_picture
le conteneur de services recherchera une connexion upload_picture
.
Si vous devez ajouter des arguments de file d'attente facultatifs, vos options de file d'attente peuvent ressembler à ceci :
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
autre exemple avec message TTL de 20 secondes :
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
La valeur de l'argument doit être une liste de types de données et de valeurs. Les types de données valides sont :
S
- CordeI
- EntierD
- DécimalT
- HorodatagesF
-TableauA
- Tableaut
- Bool Adaptez les arguments
selon vos besoins.
Si vous souhaitez lier la file d'attente à des clés de routage spécifiques, vous pouvez la déclarer dans la configuration du producteur ou du consommateur :
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
Dans un environnement Symfony tous les services sont entièrement bootstrapés pour chaque requête, à partir de la version >= 4.3 vous pouvez déclarer un service comme paresseux (Lazy Services). Cet ensemble ne prend toujours pas en charge la nouvelle fonctionnalité Lazy Services, mais vous pouvez définir lazy: true
dans votre configuration de connexion pour éviter les connexions inutiles à votre courtier de messages à chaque requête. Il est extrêmement recommandé d'utiliser des connexions paresseuses pour des raisons de performances, néanmoins l'option paresseuse est désactivée par défaut pour éviter d'éventuelles ruptures dans les applications utilisant déjà ce bundle.
C'est une bonne idée de définir read_write_timeout
sur 2x le battement de coeur pour que votre socket soit ouverte. Si vous ne le faites pas ou si vous utilisez un multiplicateur différent, il y a un risque que le socket consommateur expire.
Veuillez garder à l'esprit que vous pouvez vous attendre à des problèmes si vos tâches s'exécutent généralement plus longtemps que la période de battement de cœur, pour lesquelles il n'existe pas de bonnes solutions (lien). Pensez à utiliser soit une valeur élevée pour le battement de cœur, soit à laisser le battement de cœur désactivé au profit du keepalive
de TCP (côté client et serveur) et de la fonctionnalité graceful_max_execution_timeout
.
Vous pouvez fournir plusieurs hôtes pour une connexion. Cela vous permettra d'utiliser le cluster RabbitMQ avec plusieurs nœuds.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Faites attention que vous ne pouvez pas préciser
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
paramètres à chaque hôte séparément.
Parfois, vos informations de connexion doivent être dynamiques. Les paramètres de connexion dynamiques vous permettent de fournir ou de remplacer des paramètres par programmation via un service.
Par exemple, dans un scénario où le paramètre vhost
de la connexion dépend du locataire actuel de votre application en marque blanche et que vous ne voulez pas (ou ne pouvez pas) modifier sa configuration à chaque fois.
Définissez un service sous connection_parameters_provider
qui implémente ConnectionParametersProviderInterface
et ajoutez-le à la configuration connections
appropriée.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Exemple de mise en œuvre :
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
Dans ce cas, le paramètre vhost
sera remplacé par la sortie de getVhost()
.
Dans une application de messagerie, le processus qui envoie des messages au courtier est appelé producteur tandis que le processus recevant ces messages est appelé consommateur . Dans votre application, vous en aurez plusieurs que vous pourrez lister sous leurs entrées respectives dans la configuration.
Un producteur sera utilisé pour envoyer des messages au serveur. Dans le modèle AMQP, les messages sont envoyés à un échange , cela signifie que dans la configuration d'un producteur, vous devrez spécifier les options de connexion ainsi que les options d'échange, qui seront généralement le nom de l'échange et son type.
Supposons maintenant que vous souhaitiez traiter les téléchargements d'images en arrière-plan. Après avoir déplacé l'image vers son emplacement final, vous publierez un message sur le serveur avec les informations suivantes :
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Comme vous pouvez le voir, si dans votre configuration vous avez un producteur appelé upload_picture , alors dans le conteneur de services vous aurez un service appelé old_sound_rabbit_mq.upload_picture_producer .
Outre le message lui-même, la méthode OldSoundRabbitMqBundleRabbitMqProducer#publish()
accepte également un paramètre de clé de routage facultatif et un tableau facultatif de propriétés supplémentaires. Le tableau de propriétés supplémentaires vous permet de modifier les propriétés avec lesquelles un objet PhpAmqpLibMessageAMQPMessage
est construit par défaut. De cette façon, vous pouvez par exemple modifier les en-têtes de l'application.
Vous pouvez utiliser les méthodes setContentType et setDeliveryMode afin de définir respectivement le type de contenu du message et le mode de livraison du message, en remplaçant tout ensemble par défaut dans la section de configuration "producteurs". Si elles ne sont pas remplacées par la configuration des « producteurs » ou par un appel explicite à ces méthodes (comme dans l'exemple ci-dessous), les valeurs par défaut sont text/plain pour le type de contenu et 2 pour le mode de livraison.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Si vous devez utiliser une classe personnalisée pour un producteur (qui doit hériter de OldSoundRabbitMqBundleRabbitMqProducer
), vous pouvez utiliser l'option class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
La prochaine pièce du puzzle consiste à avoir un consommateur qui retirera le message de la file d'attente et le traitera en conséquence.
Il y a actuellement deux événements émis par le producteur.
Cet événement se produit immédiatement avant la publication du message. C'est un bon point d'ancrage pour effectuer la journalisation finale, la validation, etc. avant d'envoyer le message. Un exemple d'implémentation d'un écouteur :
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Cet événement se produit immédiatement après la publication du message. C'est un bon crochet pour effectuer toute journalisation de confirmation, validation, etc. après avoir réellement envoyé le message. Un exemple d'implémentation d'un écouteur :
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Un consommateur se connectera au serveur et démarrera une boucle en attendant que les messages entrants soient traités. Le comportement qu'il aura dépendra du rappel spécifié pour ce consommateur. Passons en revue la configuration du consommateur ci-dessus :
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Comme nous le voyons ici, l'option de rappel a une référence à un upload_picture_service . Lorsque le consommateur reçoit un message du serveur, il exécutera ce rappel. Si, à des fins de test ou de débogage, vous devez spécifier un rappel différent, vous pouvez le modifier ici.
Outre le rappel nous précisons également la connexion à utiliser, de la même manière que nous le faisons avec un producteur . Les options restantes sont Exchange_options et Queue_options . Les options d'échange doivent être les mêmes que celles utilisées pour le producteur . Dans queue_options, nous fournirons un nom de file d'attente . Pourquoi?
Comme nous l'avons dit, les messages dans AMQP sont publiés sur un échange . Cela ne signifie pas que le message a atteint une file d'attente . Pour que cela se produise, nous devons d’abord créer une telle file d’attente , puis la lier à l’ échange . Ce qui est intéressant, c'est que vous pouvez lier plusieurs files d'attente à un seul échange , de cette façon un message peut arriver à plusieurs destinations. L’avantage de cette approche est le découplage entre le producteur et le consommateur. Le producteur ne se soucie pas du nombre de consommateurs qui traiteront ses messages. Il suffit que son message parvienne au serveur. De cette façon, nous pouvons étendre les actions que nous effectuons chaque fois qu'une image est téléchargée sans avoir besoin de modifier le code dans notre contrôleur.
Maintenant, comment gérer un consommateur ? Il existe une commande qui peut être exécutée comme ceci :
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Qu'est-ce que cela signifie? Nous exécutons le consommateur upload_picture en lui disant de consommer seulement 50 messages. Chaque fois que le consommateur reçoit un message du serveur, il exécutera le rappel configuré en transmettant le message AMQP en tant qu'instance de la classe PhpAmqpLibMessageAMQPMessage
. Le corps du message peut être obtenu en appelant $msg->body
. Par défaut, le consommateur traitera les messages dans une boucle sans fin pour une certaine définition de unlimited .
Si vous voulez être sûr que le consommateur terminera son exécution instantanément sur le signal Unix, vous pouvez exécuter la commande avec l'indicateur -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
Ensuite, le consommateur terminera son exécution instantanément.
Pour utiliser la commande avec cet indicateur, vous devez installer PHP avec l'extension PCNTL.
Si vous souhaitez établir une limite de mémoire consommateur, vous pouvez le faire en utilisant flag -l
. Dans l'exemple suivant, cet indicateur ajoute une limite de mémoire de 256 Mo. Le consommateur sera arrêté cinq Mo avant d'atteindre 256 Mo afin d'éviter une erreur de taille de mémoire autorisée PHP.
$ ./app/console rabbitmq:consumer -l 256
Si vous souhaitez supprimer tous les messages en attente dans une file d'attente, vous pouvez exécuter cette commande pour purger cette file d'attente :
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Pour supprimer la file d'attente du consommateur, utilisez cette commande :
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Cela peut être utile dans de nombreux scénarios. Il existe 3 événements AMQPE :
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` et vérifier le déploiement d'une nouvelle application.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Événement déclenché avant le traitement d'un AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Événement déclenché après le traitement d'un AMQPMessage
. Si le message de processus lève une exception, l'événement ne sera pas déclenché.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Événement déclenché lorsque la méthode wait
se termine par délai d'attente sans recevoir de message. Afin d'utiliser cet événement, un consommateur idle_timeout
doit être configuré. Par défaut, le processus se termine en cas d'expiration du délai d'inactivité, vous pouvez l'empêcher en définissant $event->setForceStop(false)
dans un écouteur.
Si vous devez définir un délai d'expiration lorsqu'il n'y a aucun message de votre file d'attente pendant un certain temps, vous pouvez définir le idle_timeout
en secondes. Le idle_timeout_exit_code
spécifie quel code de sortie doit être renvoyé par le consommateur lorsque le délai d'inactivité se produit. Sans le spécifier, le consommateur lèvera une exception PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Définissez le timeout_wait
en secondes. timeout_wait
spécifie combien de temps le consommateur attendra sans recevoir de nouveau message avant de s'assurer que la connexion actuelle est toujours valide.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Si vous souhaitez que votre consommateur s'exécute jusqu'à un certain temps, puis se ferme gracieusement, définissez graceful_max_execution.timeout
en secondes. "Quitter gracieusement" signifie que le consommateur quittera soit après la tâche en cours d'exécution, soit immédiatement, en attendant de nouvelles tâches. Le graceful_max_execution.exit_code
spécifie quel code de sortie doit être renvoyé par le consommateur lorsque le délai d'expiration maximum d'exécution se produit. Sans le préciser, le consommateur quittera avec le statut 0
.
Cette fonctionnalité est idéale en conjonction avec superviseur, qui, ensemble, peuvent permettre un nettoyage périodique des fuites de mémoire, une connexion avec le renouvellement de la base de données/rabbitmq et bien plus encore.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Vous avez peut-être remarqué que le dispatching ne fonctionne toujours pas exactement comme nous le souhaiterions. Par exemple, dans une situation avec deux travailleurs, lorsque tous les messages impairs sont lourds et les messages pairs sont légers, un travailleur sera constamment occupé et l'autre ne fera pratiquement aucun travail. Eh bien, RabbitMQ n'en sait rien et distribuera toujours les messages de manière uniforme.
Cela se produit parce que RabbitMQ distribue simplement un message lorsque le message entre dans la file d'attente. Il ne prend pas en compte le nombre de messages non reconnus pour un consommateur. Il envoie simplement aveuglément chaque n-ième message au n-ième consommateur.
Afin de vaincre cela, nous pouvons utiliser la méthode basic.qos avec le paramètre prefetch_count=1. Cela indique à RabbitMQ de ne pas donner plus d'un message à un travailleur à la fois. Ou, en d'autres termes, n'envoyez pas de nouveau message à un travailleur tant qu'il n'a pas traité et accusé réception du précédent. Au lieu de cela, il l'enverra au prochain travailleur qui n'est pas encore occupé.
Depuis : http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Soyez prudent car la mise en œuvre de la répartition équitable introduit une latence qui nuira aux performances (voir cet article de blog). Mais sa mise en œuvre vous permet d'évoluer horizontalement de manière dynamique à mesure que la file d'attente augmente. Vous devez évaluer, comme le recommande l'article de blog, la bonne valeur de prefetch_size en fonction du temps nécessaire au traitement de chaque message et des performances de votre réseau.
Avec RabbitMqBundle, vous pouvez configurer ces qos_options par consommateur comme ceci :
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
S'il est utilisé avec Symfony 4.2+, le bundle déclare dans le conteneur un ensemble d'alias pour les producteurs et les consommateurs réguliers. Ceux-ci sont utilisés pour le câblage automatique des arguments en fonction du type déclaré et du nom de l'argument. Cela vous permet de modifier l'exemple de producteur précédent en :
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
Le nom de l'argument est construit à partir du nom du producteur ou du consommateur issu de la configuration et suffixé du mot producteur ou consommateur selon le type. Contrairement aux éléments conteneurs, le suffixe du mot (producteur ou consommateur) ne sera pas dupliqué si le nom est déjà suffixé. La clé du producteur upload_picture
sera remplacée par le nom de l'argument $uploadPictureProducer
. La clé de producteur upload_picture_producer
serait également alias au nom de l'argument $uploadPictureProducer
. Il est préférable d’éviter les noms similaires de cette manière.
Tous les producteurs sont alias OldSoundRabbitMqBundleRabbitMqProducerInterface
et l'option de classe de producteur de la configuration. En mode sandbox, seuls les alias ProducerInterface
sont créés. Il est fortement recommandé d'utiliser la classe ProducerInterface
lors de la saisie d'arguments d'indication pour l'injection du producteur.
Tous les consommateurs portent l'alias de la valeur de l'option de configuration OldSoundRabbitMqBundleRabbitMqConsumerInterface
et %old_sound_rabbit_mq.consumer.class%
. Il n'y a aucune différence entre le mode normal et le mode sandbox. Il est fortement recommandé d'utiliser ConsumerInterface
lors de la saisie d'arguments d'indication pour l'injection client.
Voici un exemple de rappel :
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Comme vous pouvez le voir, c'est aussi simple que d'implémenter une méthode : ConsumerInterface::execute .
Gardez à l'esprit que vos rappels doivent être enregistrés en tant que services Symfony normaux. Là, vous pouvez injecter le conteneur de services, le service de base de données, le logger Symfony, etc.
Voir https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md pour plus de détails sur ce qui fait partie d'une instance de message.
Pour arrêter le consommateur, le rappel peut lancer StopConsumerException
(le dernier message consommé ne sera pas un accusé de réception) ou AckStopConsumerException
(le message sera un accusé de réception). Si vous utilisez un superviseur diabolisé, par exemple, le consommateur redémarrera en fait.
Cela semble représenter beaucoup de travail pour simplement envoyer des messages, récapitulons pour avoir une meilleure vue d'ensemble. Voici ce dont nous avons besoin pour produire/consommer des messages :
Et c'est tout !
Il s'agissait d'une exigence pour avoir une traçabilité des messages reçus/publiés. Pour activer cela, vous devrez ajouter la configuration enable_logger
aux consommateurs ou aux éditeurs.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Si vous le souhaitez, vous pouvez également traiter la journalisation des files d'attente avec différents gestionnaires dans monologue, en référençant le canal phpamqplib
.
Jusqu’à présent, nous avons simplement envoyé des messages aux consommateurs, mais que se passe-t-il si nous voulons obtenir une réponse de leur part ? Pour y parvenir, nous devons implémenter les appels RPC dans notre application. Ce bundle permet de réaliser assez facilement de telles choses avec Symfony.
Ajoutons un client et un serveur RPC dans la configuration :
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Pour une référence de configuration complète, veuillez utiliser la commande php app/console config:dump-reference old_sound_rabbit_mq
.
Nous avons ici un serveur très utile : il renvoie des entiers aléatoires à ses clients. Le rappel utilisé pour traiter la requête sera le service random_int_server . Voyons maintenant comment l'invoquer depuis nos contrôleurs.
Nous devons d’abord démarrer le serveur depuis la ligne de commande :
$ ./app/console_dev rabbitmq:rpc-server random_int
Et puis ajoutez le code suivant à notre contrôleur :
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Comme vous pouvez le voir ici, si notre identifiant client est integer_store , alors le nom du service sera old_sound_rabbit_mq.integer_store_rpc . Une fois que nous obtenons cet objet, nous plaçons une requête sur le serveur en appelant addRequest
qui attend trois paramètres :
Les arguments que nous envoyons sont les valeurs min et max de la fonction rand()
. Nous les envoyons en sérialisant un tableau. Si notre serveur attend des informations JSON ou XML, nous enverrons ces données ici.
La dernière étape consiste à obtenir la réponse. Notre script PHP bloquera jusqu'à ce que le serveur renvoie une valeur. La variable $replies sera un tableau associatif où chaque réponse du serveur sera contenue dans la clé request_id respective.
Par défaut, le client RPC s'attend à ce que la réponse soit sérialisée. Si le serveur avec lequel vous travaillez renvoie un résultat non sérialisé, définissez l'option expect_serialized_response
du client RPC sur false. Par exemple, si le serveur integer_store ne sérialise pas le résultat, le client sera défini comme ci-dessous :
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
Vous pouvez également définir une expiration pour la demande en millisecondes, après quoi le message ne sera plus traité par le serveur et la demande du client expirera simplement. La définition de l'expiration des messages ne fonctionne que pour RabbitMQ 3.x et versions ultérieures. Visitez http://www.rabbitmq.com/ttl.html#per-message-ttl pour plus d'informations.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Comme vous pouvez le deviner, nous pouvons également effectuer des appels RPC parallèles .
Disons que pour afficher une page Web, vous devez effectuer deux requêtes de base de données, l'une prenant 5 secondes et l'autre 2 secondes (requêtes très coûteuses). Si vous les exécutez séquentiellement, votre page sera prête à être livrée dans environ 7 secondes. Si vous les exécutez en parallèle, votre page sera servie en 5 secondes environ. Avec RabbitMqBundle
nous pouvons facilement effectuer de tels appels parallèles. Définissons un client parallèle dans la configuration et un autre serveur RPC :
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Ensuite, ce code devrait aller dans notre contrôleur :
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
Est très similaire à l’exemple précédent, nous avons juste un appel addRequest
supplémentaire. Nous fournissons également des identifiants de requête significatifs afin qu'il nous soit plus facile de trouver plus tard la réponse souhaitée dans le tableau $replies .
Pour activer la réponse directe aux clients, il vous suffit d'activer l'option direct_reply_to dans la configuration rpc_clients pour le client.
Cette option utilisera la pseudo-file d'attente amq.rabbitmq.reply-to lors des appels RPC. Sur le serveur RPC, aucune modification n'est nécessaire.
RabbitMQ a une implémentation de file d'attente prioritaire dans le noyau à partir de la version 3.5.0. N'importe quelle file d'attente peut être transformée en file d'attente prioritaire à l'aide d'arguments facultatifs fournis par le client (mais contrairement à d'autres fonctionnalités qui utilisent des arguments facultatifs, pas des politiques). L'implémentation prend en charge un nombre limité de priorités : 255. Des valeurs comprises entre 1 et 10 sont recommandées. Vérifier la documentation
voici comment déclarer une file d'attente prioritaire
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
si une file d'attente upload-picture
existe avant, vous devez supprimer cette file d'attente avant d'exécuter la commande rabbitmq:setup-fabric
Supposons maintenant que vous souhaitiez créer un message avec une priorité élevée, vous devez publier le message avec ces informations supplémentaires
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
C'est une bonne pratique d'avoir beaucoup de files d'attente pour la séparation logique. Avec un simple consommateur vous devrez créer un travailleur (consommateur) par file d'attente et cela peut être difficile à gérer face à de nombreuses évolutions (oublier d'ajouter une ligne dans votre configuration superviseur ?). Ceci est également utile pour les petites files d'attente, car vous ne souhaitez peut-être pas avoir autant de travailleurs que de files d'attente et souhaitez regrouper certaines tâches sans perdre la flexibilité et le principe de séparation.
Plusieurs consommateurs vous permettent de gérer ce cas d'utilisation en écoutant plusieurs files d'attente sur le même consommateur.
Voici comment définir un consommateur avec plusieurs files d'attente :
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
Le rappel est désormais spécifié sous chaque file d'attente et doit implémenter la ConsumerInterface
comme un simple consommateur. Toutes les options des queues-options
du consommateur sont disponibles pour chaque file d'attente.
Sachez que toutes les files d'attente sont sous le même central, à vous de définir le bon routage pour les rappels.
queues_provider
est un service facultatif qui fournit dynamiquement des files d'attente. Il doit implémenter QueuesProviderInterface
.
Sachez que les fournisseurs de files d'attente sont responsables des appels appropriés à setDequeuer
et que les rappels sont appelables (et non ConsumerInterface
). Dans le cas où le service fournissant des files d'attente implémente DequeuerAwareInterface
, un appel à setDequeuer
est ajouté à la définition du service avec un DequeuerInterface
étant actuellement un MultipleConsumer
.
Vous constaterez peut-être que votre application a un flux de travail complexe et que vous avez besoin d'une liaison arbitraire. Les scénarios de liaison arbitraires peuvent inclure des liaisons d'échange à échange via la propriété destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
La commande Rabbitmq:setup-fabric déclarera les échanges et les files d'attente tels que définis dans vos configurations de producteur, de consommateur et de multi-consommateur avant de créer vos liaisons arbitraires. Cependant, Rabbitmq:setup-fabric ne déclarera PAS les files d'attente supplémentaires et les échanges définis dans les liaisons. A vous de vous assurer que les échanges/files d'attente sont déclarés.
Parfois, vous devez modifier la configuration du consommateur à la volée. Les consommateurs dynamiques vous permettent de définir les options de file d'attente des consommateurs par programmation, en fonction du contexte.
Par exemple, dans un scénario où le consommateur défini doit être responsable d'un nombre dynamique de sujets et que vous ne voulez pas (ou ne pouvez pas) modifier sa configuration à chaque fois.
Définissez un service queue_options_provider
qui implémente QueueOptionsProviderInterface
et ajoutez-le à votre configuration dynamic_consumers
.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Exemple d'utilisation :
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
Dans ce cas, le consommateur proc_logs
s'exécute pour server1
et peut décider des options de file d'attente qu'il utilise.
Maintenant, pourquoi aurons-nous un jour besoin de consommateurs anonymes ? Cela ressemble à une menace Internet ou quelque chose du genre… Continuez à lire.
Dans AMQP, il existe un type d'échange appelé sujet dans lequel les messages sont acheminés vers des files d'attente en fonction – vous devinez – du sujet du message. Nous pouvons envoyer des journaux concernant notre application à un échange de sujets RabbiMQ en utilisant comme sujet le nom d'hôte où le journal a été créé et la gravité de ce journal. Le corps du message sera le contenu du journal et nos clés de routage ressembleront à ceci :
Puisque nous ne voulons pas remplir les files d'attente avec des journaux illimités, ce que nous pouvons faire, c'est que lorsque nous voulons surveiller le système, nous pouvons lancer un consommateur qui crée une file d'attente et s'attache à l'échange de journaux en fonction d'un sujet, par exemple. , nous aimerions voir toutes les erreurs signalées par nos serveurs. La clé de routage ressemblera à quelque chose comme : #.error . Dans ce cas, nous devons trouver un nom de file d'attente, le lier à l'échange, récupérer les journaux, le dissocier et supprimer la file d'attente. Heureusement, AMPQ fournit un moyen de le faire automatiquement si vous fournissez les bonnes options lorsque vous déclarez et liez la file d'attente. Le problème est que vous ne voulez pas mémoriser toutes ces options. C'est pour cette raison que nous avons implémenté le modèle de consommateur anonyme .
Lorsque nous démarrons un consommateur anonyme, il s'occupera de ces détails et nous n'aurons plus qu'à penser à implémenter le rappel lorsque les messages arrivent. Est-il appelé Anonyme car il ne spécifiera pas de nom de file d'attente, mais il attendra que RabbitMQ lui en attribue un au hasard.
Maintenant, comment configurer et exécuter un tel consommateur ?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Là, nous spécifions le nom de l'échange et son type ainsi que le rappel qui doit être exécuté lorsqu'un message arrive.
Ce Consommateur Anonyme est désormais en mesure d'écouter les Producteurs, qui sont liés au même échange et de type thématique :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Pour démarrer un consommateur anonyme, nous utilisons la commande suivante :
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
La seule nouvelle option par rapport aux commandes que nous avons vues précédemment est celle qui spécifie la clé de routage : -r '#.error'
.
Dans certains cas, vous souhaiterez recevoir un lot de messages, puis effectuer un traitement sur chacun d'eux. Les consommateurs batch vous permettront de définir une logique pour ce type de traitement.
Par exemple : imaginez que vous ayez une file d'attente dans laquelle vous recevez un message pour insérer des informations dans la base de données, et vous réalisez que si vous effectuez une insertion par lots, il est bien préférable de les insérer une par une.
Définissez un service de rappel qui implémente BatchConsumerInterface
et ajoutez la définition du consommateur à votre configuration.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Remarque : Si l'option keep_alive
est définie sur true
, idle_timeout_exit_code
sera ignoré et le processus de consommation se poursuivra.
Vous pouvez implémenter un consommateur par lots qui accusera réception de tous les messages en un seul retour ou vous pouvez contrôler le message à accuser réception.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
Comment exécuter le consommateur par lots suivant :
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Important : BatchConsumers n'aura pas l'option -m|messages
disponible Important : BatchConsumers peut également avoir l'option -b|batches
disponible si vous souhaitez consommer uniquement un nombre spécifique de lots, puis arrêter le consommateur. Donnez le nombre de lots uniquement si vous souhaitez que le consommateur s'arrête après la consommation de ces messages par lots !
Il existe une commande qui lit les données de STDIN et les publie dans une file d'attente RabbitMQ. Pour l'utiliser, vous devez d'abord configurer un service producer
dans votre fichier de configuration comme ceci :
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Ce producteur publiera des messages avec les words
échange direct. Bien entendu, vous pouvez adapter la configuration à votre guise.
Supposons ensuite que vous souhaitiez publier le contenu de certains fichiers XML afin qu'ils soient traités par une batterie de consommateurs. Vous pouvez les publier en utilisant simplement une commande comme celle-ci :
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Cela signifie que vous pouvez composer des producteurs avec des commandes Unix simples.
Décomposons cette doublure :
$ find vendor/symfony/ -name " *.xml " -print0
Cette commande trouvera tous les fichiers .xml
dans le dossier symfony et imprimera le nom du fichier. Chacun de ces noms de fichiers est ensuite redirigé vers cat
via xargs
:
$ xargs -0 cat
Et enfin la sortie de cat
va directement à notre producteur qui est invoqué comme ceci :
$ ./app/console rabbitmq:stdin-producer words
Cela ne prend qu'un seul argument qui est le nom du producteur tel que vous l'avez configuré dans votre fichier config.yml
.
Le but de ce bundle est de permettre à votre application de produire des messages et de les publier sur certains échanges que vous avez configurés.
Dans certains cas et même si votre configuration est correcte, les messages que vous produisez ne seront acheminés vers aucune file d'attente car il n'en existe pas. Le consommateur responsable de la consommation de la file d'attente doit être exécuté pour que la file d'attente soit créée.
Lancer une commande pour chaque consommateur peut s’avérer un véritable cauchemar lorsque le nombre de consommateurs est élevé.
Afin de créer des échanges, des files d'attente et des liaisons en même temps et être sûr de ne perdre aucun message, vous pouvez exécuter la commande suivante :
$ ./app/console rabbitmq:setup-fabric
Si vous le souhaitez, vous pouvez configurer vos consommateurs et producteurs pour qu'ils supposent que la structure RabbitMQ est déjà définie. Pour ce faire, ajoutez ce qui suit à votre configuration :
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
Par défaut, un consommateur ou un producteur déclarera tout ce dont il a besoin avec RabbitMQ au démarrage. Soyez prudent en utilisant ceci, lorsque les échanges ou les files d'attente ne sont pas définis, il y aura des erreurs. Lorsque vous avez modifié une configuration, vous devez exécuter la commande setup-fabric ci-dessus pour déclarer votre configuration.
Pour contribuer, ouvrez simplement une Pull Request avec votre nouveau code en tenant compte du fait que si vous ajoutez de nouvelles fonctionnalités ou modifiez celles existantes, vous devez documenter dans ce README ce qu'elles font. Si vous cassez BC, vous devez également le documenter. Vous devez également mettre à jour le CHANGELOG. Donc:
Voir : resources/meta/LICENSE.md
La structure du bundle et la documentation sont partiellement basées sur le RedisBundle