Cette bibliothèque est une implémentation PHP pure du protocole AMQP 0-9-1. Il a été testé contre RabbitMQ.
La bibliothèque a été utilisée pour les exemples PHP de RabbitMQ en action et les didacticiels officiels RabbitMQ.
Veuillez noter que ce projet est publié avec un code de conduite de contributeur. En participant à ce projet, vous acceptez de respecter ses conditions.
Merci à Videlalvaro et PostalService14 pour avoir créé php-amqplib
.
Le package est maintenant entretenu par Ramūnas Dronga, Luke Bakken et plusieurs ingénieurs VMware travaillant sur Rabbitmq.
En commençant par la version 2.0, cette bibliothèque utilise AMQP 0.9.1
par défaut et nécessite donc la version RabbitMQ 2.0 ou ultérieure. Habituellement, les mises à niveau du serveur ne nécessitent aucune modification de code d'application, car le protocole modifie très rarement, mais veuillez effectuer vos propres tests avant la mise à niveau.
Puisque la bibliothèque utilise AMQP 0.9.1
nous avons ajouté un support pour les extensions de lapin suivantes:
Échange pour échanger les liaisons
Nack de base
L'éditeur confirme
Consumer annuler la notification
Les extensions qui modifient les méthodes existantes telles que alternate exchanges
sont également prises en charge.
Enqueue / AMQP-lib est un wrapper compatible AMQP compatible.
Amqproxy est une bibliothèque proxy avec connexion et regroupement / réutilisation des canaux. Cela permet une connexion plus faible et un désabonnement de canal lors de l'utilisation de PHP-AMQPLIB, conduisant à moins d'utilisation du CPU de RabbitMQ.
Assurez-vous que le compositeur a installé, puis exécutez la commande suivante:
$ compositeur nécessite PHP-AMQPLIB / PHP-AMQPIB
Cela va chercher la bibliothèque et ses dépendances à l'intérieur de votre dossier de fournisseur. Ensuite, vous pouvez ajouter ce qui suit à vos fichiers .php afin d'utiliser la bibliothèque
require_once __dir __. '/ vendeur / autoload.php';
Ensuite, vous devez use
les classes pertinentes, par exemple:
Utilisez PHPAMQPLIBConnectionAmqPStreAmConnection; Utilisez PHPAMQPLIBMESSAGEAMQPMESSAGE;
Avec Rabbitmq en cours d'exécution, deux terminaux et sur le premier exécutent les commandes suivantes pour démarrer le consommateur:
$ CD PHP-AMQPLIB / Demo $ php amqp_consumer.php
Ensuite, sur l'autre terminal, faites:
$ CD PHP-AMQPLIB / Demo $ php amqp_publisher.php un texte à publier
Vous devriez voir le message arriver au processus sur l'autre terminal
Ensuite, pour arrêter le consommateur, envoyez-lui le message quit
:
$ php amqp_publisher.php quitte
Si vous avez besoin d'écouter les prises utilisées pour vous connecter à RabbitMQ, voyez l'exemple dans le consommateur non bloquant.
$ php amqp_consumer_non_blocking.php
Veuillez consulter Changelog pour plus d'informations ce qui a changé récemment.
http://php-amqplib.github.io/php-amqplib/
Pour ne pas nous répéter, si vous voulez en savoir plus sur cette bibliothèque, veuillez vous référer aux didacticiels officiels RabbitMQ.
amqp_ha_consumer.php
: Demos L'utilisation de files d'attente en miroir.
amqp_consumer_exclusive.php
et amqp_publisher_exclusive.php
: démos fans échanges à l'aide de files d'attente exclusives.
amqp_consumer_fanout_{1,2}.php
et amqp_publisher_fanout.php
: Demos Fanout Échange avec des files d'attente nommées.
amqp_consumer_pcntl_heartbeat.php
: Utilisation de l'expéditeur de rythmes cardiaques basée sur le signal de démos.
basic_get.php
: démos obtenant des messages à partir des files d'attente en utilisant l'appel AMQP BASIC GET .
Si vous avez un groupe de plusieurs nœuds auxquels votre application peut se connecter, vous pouvez démarrer une connexion avec un tableau d'hôtes. Pour ce faire, vous devez utiliser la méthode statique create_connection
.
Par exemple:
$ connection = amqpStreamConnection :: create_connection ([ ['host' => host1, 'port' => port, 'utilisateur' => utilisateur, 'password' => pass, 'vhost' => vhost], ['host' => host2, 'port' => port, 'utilisateur' => utilisateur, 'mot de passe' => pass, 'vhost' => vhost] ], $ options);
Ce code essaiera d'abord de se connecter à HOST1
et de se connecter à HOST2
si la première connexion échoue. La méthode renvoie un objet de connexion pour la première connexion réussie. Si toutes les connexions échouent, il lèvera l'exception de la dernière tentative de connexion.
Voir demo/amqp_connect_multiple_hosts.php
pour plus d'exemples.
Disons que vous avez un processus qui génère un tas de messages qui seront publiés sur le même exchange
en utilisant le même routing_key
et des options comme mandatory
. Ensuite, vous pouvez utiliser la fonctionnalité de bibliothèque batch_basic_publish
. Vous pouvez parcourir des messages comme ceci:
$ msg = new AmqpMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ échange); $ msg2 = new AmqpMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ exchange);
puis envoyez le lot comme ceci:
$ ch-> publih_batch ();
Disons que notre programme doit lire à partir d'un fichier, puis publier un message par ligne. Selon la taille du message, vous devrez décider quand il est préférable d'envoyer le lot. Vous pouvez l'envoyer tous les 50 messages, ou tous les cent. Cela dépend de vous.
Une autre façon d'accélérer votre publication de messages est de réutiliser les instances de message AMQPMessage
. Vous pouvez créer votre nouveau message comme ceci:
$ Properties = Array ('Content_Type' => 'Text / PLAW', 'Delivery_Mode' => AMQPMESSAGE :: Livrot_Mode_Persistent); $ msg = new AmqPMessage ($ body, $ Properties); $ ch-> basic_publish ($ msg, $ échange);
Disons maintenant que même si vous souhaitez modifier le corps du message pour les futurs messages, vous conserverez les mêmes propriétés, c'est-à-dire que vos messages seront toujours text/plain
et que la delivery_mode
sera toujours AMQPMessage::DELIVERY_MODE_PERSISTENT
. Si vous créez une nouvelle instance AMQPMessage
pour chaque message publié, ces propriétés devraient être réencodées au format binaire AMQP. Vous pouvez éviter tout cela en réutilisant simplement le AMQPMessage
, puis en réinitialisant le corps du message comme ceci:
$ msg-> setBody ($ body2); $ ch-> basic_publish ($ msg, $ échange);
AMQP n'impose aucune limite à la taille des messages; Si un très grand message est reçu par un consommateur, la limite de mémoire de PHP peut être atteinte dans la bibliothèque avant que le rappel transmis à basic_consume
ne soit appelé.
Pour éviter cela, vous pouvez appeler la méthode AMQPChannel::setBodySizeLimit(int $bytes)
sur votre instance de chaîne. Les tailles de corps dépassant cette limite seront tronquées et livrées à votre rappel avec un drapeau AMQPMessage::$is_truncated
SET sur true
. La propriété AMQPMessage::$body_size
reflétera la vraie taille corporelle d'un message reçu, qui sera supérieur à strlen(AMQPMessage::getBody())
si le message a été tronqué.
Notez que toutes les données supérieures à la limite sont lues à partir du canal AMQP et immédiatement rejetées, il n'y a donc aucun moyen de les récupérer dans votre rappel. Si vous avez un autre consommateur qui peut gérer des messages avec des charges utiles plus importantes, vous pouvez utiliser basic_reject
ou basic_nack
pour dire au serveur (qui a toujours une copie complète) pour le transmettre à un échange de lettres morts.
Par défaut, aucune troncature ne se produira. Pour désactiver la troncature sur un canal qui l'a activé, passer 0
(ou null
) à AMQPChannel::setBodySizeLimit()
.
Certains clients RabbitMQ utilisant des mécanismes de récupération de connexion automatisés pour reconnecter et récupérer les canaux et les consommateurs en cas d'erreurs de réseau.
Étant donné que ce client utilise un seul fil, vous pouvez configurer la récupération de connexion à l'aide du mécanisme de gestion des exceptions.
Exceptions qui pourraient être jetées en cas d'erreurs de connexion:
PhpamqplibExceptionaMQPConnectionClosedExceptionPhpamqplibExceptionaMQPioExceptionRuntimeExceptionErrogexception
Certaines autres exceptions peuvent être lancées, mais la connexion peut toujours être là. C'est toujours une bonne idée de nettoyer une ancienne connexion lors de la manipulation d'une exception avant de se reconnecter.
Par exemple, si vous souhaitez configurer une connexion en récupération:
$ connection = null; $ channel = null; while (true) {try {$ connection = new AmqPStreamConnection (hôte, port, utilisateur, pass, vhost); // votre code d'application va ici.do_something_with_connection ($ connection); } catch (amqpruntimeException $ e) {echo $ e-> getMessage (); cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } catch (RuntimeException $ e) {CleanUp_Connection ($ connection); usleep (wait_before_reconnect_us); } catch (errorexception $ e) {Cleanup_Connection ($ connection); usleep (wait_before_reconnect_us); } }
Un exemple complet est dans demo/connection_recovery_consume.php
.
Ce code reconnectera et réessayer le code d'application à chaque fois que l'exception se produit. Certaines exceptions peuvent toujours être lancées et ne doivent pas être gérées dans le cadre du processus de reconnexion, car elles peuvent être des erreurs d'application.
Cette approche a du sens principalement pour les applications de consommation, les producteurs nécessiteront un code d'application supplémentaire pour éviter de publier le même message plusieurs fois.
C'était un exemple le plus simple, dans une application réelle, vous voudrez peut-être contrôler le nombre de RetT et peut-être dégrader gracieusement le temps d'attente pour reconnecter.
Vous pouvez trouver un exemple plus excessif dans # 444
Si vous avez installé le dépôt d'extension PCNTL du signal sera géré lorsque le consommateur ne traite pas le message.
$ pcntlHandler = fonction ($ signal) {switch ($ signal) {case sigter: case sigusr1: case sigint: // quelques trucs avant d'arrêter le consommateur, de supprimer le verrouillage etcpcntl_signal ($ signal, sig_dfl); // restaurer handlerPosix_kill (posix_getpid (), $ signal); // Kill Self with Signal, voir https://www.cons.org/cracauer/sigint.htmlcase sighup: // certaines choses pour redémarrer Consumerbreak; par défaut: // ne rien faire} }; pcntl_signal (siigter, $ pcntlhandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlhandler); pcntl_signal (sighup, $ pcntlandler);
Pour désactiver cette fonctionnalité, définissez simplement AMQP_WITHOUT_SIGNALS
comme true
<? phpdefine ('amqp_without_signals', true); ... plus de code
Si vous avez installé une extension PCNTL et utilisez PHP 7.1 ou plus, vous pouvez enregistrer un expéditeur de battements de cœur basé sur le signal.
<? Php $ Sender = new pcntlHeartBeatSender ($ connection); $ SENDER-> registre (); ... code $ Sender-> ungister ();
Si vous voulez savoir ce qui se passe au niveau du protocole, ajoutez la constante suivante à votre code:
<? phpdefine ('amqp_debug', true); ... plus de code?>
Pour exécuter le type de référence de publication / consommer:
$ Make Benchmark
Veuillez consulter la contribution pour plus de détails.
Si vous souhaitez toujours utiliser l'ancienne version du protocole, vous pouvez le faire en définissant la constante suivante dans votre code de configuration:
Define ('AMQP_PROTOCOL', '0,8');
La valeur par défaut est '0.9.1'
.
Si pour une raison quelconque, vous ne voulez pas utiliser de compositeur, vous devez avoir un autoloader en place pour les classes de bibliothèque. Les gens se sont déclarés utiliser cet autoloader avec succès.
Vous trouverez ci-dessous le contenu du fichier ReadMe d'origine. Les crédits vont aux auteurs originaux.
PHP Library Implémentation du protocole de mise en file d'attente de messages avancé (AMQP).
La bibliothèque est le port du code Python de py-amqplib http://barryp.org/software/py-amqplib/
Il a été testé avec RabbitMQ Server.
Page d'accueil du projet: http://code.google.com/p/php-amqplib/
Pour la discussion, veuillez rejoindre le groupe:
http://groups.google.com/group/php-amqplib-devel
Pour les rapports de bogues, veuillez utiliser le système de suivi des bogues sur la page du projet.
Les correctifs sont les bienvenus!
Auteur: Vadim Zaliva [email protected]