Esta biblioteca es una implementación PHP pura del protocolo AMQP 0-9-1. Se ha probado contra Rabbitmq.
La biblioteca se utilizó para los ejemplos de PHP de RabbitMQ en acción y los tutoriales oficiales de RabbitMQ.
Tenga en cuenta que este proyecto se publica con un código de conducta de contribuyentes. Al participar en este proyecto, acepta cumplir con sus términos.
Gracias a Videlalvaro y PostalService14 por crear php-amqplib
.
El paquete ahora es mantenido por Ramūnas Dronga, Luke Bakken y varios ingenieros de VMware que trabajan en RabbitMQ.
Comenzando con la versión 2.0, esta biblioteca usa AMQP 0.9.1
de forma predeterminada y, por lo tanto, requiere la versión RabbitMQ 2.0 o posterior. Por lo general, las actualizaciones del servidor no requieren ningún cambio de código de aplicación, ya que el protocolo cambia con poca frecuencia, pero realice sus propias pruebas antes de actualizar.
Dado que la biblioteca usa AMQP 0.9.1
agregamos soporte para las siguientes extensiones de RabbitMQ:
Intercambiar para intercambiar enlaces
Nack básico
El editor confirma
Notificación de cancelación del consumidor
También se admiten extensiones que modifican métodos existentes como alternate exchanges
.
Enqueue/AMQP-LIB es un envoltorio compatible con interoperabilidad AMQP.
AMQProxy es una biblioteca proxy con conexión y agrupación/reutilización de canales. Esto permite una conexión más baja y una rotación de canales cuando se usa PHP-AMQPLIB, lo que lleva a un menor uso de la CPU de RabbitMQ.
Asegúrese de tener instalado el compositor, luego ejecute el siguiente comando:
$ compositor requiere PHP-AMQPLIB/PHP-AMQPLIB
Eso buscará la biblioteca y sus dependencias dentro de su carpeta de proveedores. Luego puede agregar lo siguiente a sus archivos .php para usar la biblioteca
require_once __dir __. '/Vendor/Autoload.php';
Luego debe use
las clases relevantes, por ejemplo:
use phpamqPlibConnectionAmQPStreamConnection; use phpamqPlibMessAmeamqpMessage;
Con RabbitMQ ejecutando dos terminales y en el primero ejecuta los siguientes comandos para iniciar el consumidor:
$ CD PHP-AMQPLIB/Demo $ php amqp_consumer.php
Luego, en el otro terminal, hacer:
$ CD PHP-AMQPLIB/Demo $ php amqp_publisher.php Algunos texto para publicar
Debería ver que el mensaje llegue al proceso en el otro terminal
Luego para detener al consumidor, envíelo el mensaje quit
:
$ php amqp_publisher.php renuncia
Si necesita escuchar los enchufes utilizados para conectarse a RabbitMQ, vea el ejemplo en el consumidor que no es bloqueo.
$ php amqp_consumer_non_blowing.php
Consulte ChangeLog para obtener más información lo que ha cambiado recientemente.
http://php-amqplib.github.io/php-amqplib/
Para no repetirnos, si desea obtener más información sobre esta biblioteca, consulte los tutoriales oficiales de RabbitMQ.
amqp_ha_consumer.php
: Demoss el uso de colas reflejadas.
amqp_consumer_exclusive.php
y amqp_publisher_exclusive.php
: intercambios de fanáticos de demostraciones utilizando colas exclusivas.
amqp_consumer_fanout_{1,2}.php
y amqp_publisher_fanout.php
: intercambios de faneut de demostraciones con colas con nombre.
amqp_consumer_pcntl_heartbeat.php
: Uso del remitente Heartbeat basado en la señal Demos.
basic_get.php
: demostraciones obteniendo mensajes de las colas utilizando la llamada BASIC GET AMQP.
Si tiene un clúster de múltiples nodos a los que su aplicación puede conectarse, puede iniciar una conexión con una variedad de hosts. Para hacerlo, debe usar el método estático create_connection
.
Por ejemplo:
$ conection = amqpStreamConnection :: create_connection ([[ ['host' => host1, 'puerto' => puerto, 'usuario' => usuario, 'contraseña' => pase, 'vhost' => vhost], ['host' => host2, 'puerto' => puerto, 'usuario' => usuario, 'contraseña' => pase, 'vhost' => vhost] ], $ opciones);
Este código intentará conectarse primero a HOST1
y conectarse a HOST2
si la primera conexión falla. El método devuelve un objeto de conexión para la primera conexión exitosa. Si todas las conexiones fallan, arrojará la excepción del último intento de conexión.
Ver demo/amqp_connect_multiple_hosts.php
para más ejemplos.
Supongamos que tiene un proceso que genera un montón de mensajes que se publicarán en el mismo exchange
utilizando el mismo routing_key
y opciones como mandatory
. Luego podría hacer uso de la función de biblioteca batch_basic_publish
. Puedes lotes de mensajes como este:
$ msg = new AmqPMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ intercambio); $ msg2 = nuevo amqpMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ intercambio);
Y luego envíe el lote así:
$ ch-> publish_batch ();
Digamos que nuestro programa necesita leer desde un archivo y luego publicar un mensaje por línea. Dependiendo del tamaño del mensaje, tendrá que decidir cuándo es mejor enviar el lote. Puede enviarlo cada 50 mensajes, o cada cien. Eso depende de ti.
Otra forma de acelerar la publicación de su mensaje es reutilizar las instancias de mensajes AMQPMessage
. Puede crear su nuevo mensaje como este:
$ Properties = Array ('Content_Type' => 'Text/Plain', 'Delivery_mode' => AmqpMessage :: Delivery_mode_persistent); $ msg = new AmqpMessage ($ Body, $ Properties); $ Ch-> Basic_Publish ($ Msg, $ intercambio);
Ahora digamos que, si bien desea cambiar el cuerpo del mensaje para mensajes futuros AMQPMessage::DELIVERY_MODE_PERSISTENT
mantendrá las mismas propiedades, es decir, sus mensajes seguirán siendo text/plain
y el delivery_mode
seguirá Si crea una nueva instancia AMQPMessage
para cada mensaje publicado, entonces esas propiedades tendrían que volver a codificar en el formato binario AMQP. Puede evitar todo eso simplemente reutilizando el AMQPMessage
y luego restablecer el cuerpo del mensaje así:
$ msg-> setbody ($ body2); $ ch-> básico_publish ($ msg, $ intercambio);
AMQP no impone límite al tamaño de los mensajes; Si un consumidor recibe un mensaje muy grande, se puede alcanzar el límite de memoria de PHP dentro de la biblioteca antes de que se llame a la devolución de llamada a basic_consume
.
Para evitar esto, puede llamar al método AMQPChannel::setBodySizeLimit(int $bytes)
en su instancia de canal. El tamaño del cuerpo que excede este límite se truncará y se entregará a su devolución de llamada con una bandera AMQPMessage::$is_truncated
establecido en true
. La propiedad AMQPMessage::$body_size
reflejará el verdadero tamaño corporal de un mensaje recibido, que será más alto que strlen(AMQPMessage::getBody())
si el mensaje ha sido truncado.
Tenga en cuenta que todos los datos por encima del límite se leen desde el canal AMQP e inmediatamente se descartan, por lo que no hay forma de recuperarlos dentro de su devolución de llamada. Si tiene otro consumidor que puede manejar mensajes con cargas útiles más grandes, puede usar basic_reject
o basic_nack
para decirle al servidor (que todavía tiene una copia completa) que lo reenvíe a un intercambio de letras muertas.
Por defecto, no se producirá truncamiento. Para deshabilitar el truncamiento en un canal que lo ha habilitado, pase 0
(o null
) a AMQPChannel::setBodySizeLimit()
.
Algunos clientes de RabbitMQ que utilizan mecanismos de recuperación de conexión automatizados para reconectar y recuperar canales y consumidores en caso de errores de red.
Dado que este cliente está utilizando un solo hilo, puede configurar la recuperación de conexión utilizando el mecanismo de manejo de excepciones.
Excepciones que podrían ser lanzadas en caso de errores de conexión:
PhpamqPlibExceptionAmqpConnectionClosedExceptionPhPamqPlibExceptionAmqpioExceptionRuntimeExceptionErException
Se pueden lanzar algunas otras excepciones, pero la conexión aún puede estar allí. Siempre es una buena idea limpiar una conexión antigua al manejar una excepción antes de volver a conectarse.
Por ejemplo, si desea configurar una conexión de recuperación:
$ conecte = null; $ canal = null; while (true) {try {$ conection = new AmqPStreamConnection (host, puerto, usuario, pase, vhost); // Su código de aplicación va aquí.do_somthing_with_connection ($ conexión); } Catch (AmqprunteException $ e) {echo $ e-> getMessage (); limpiuez_connection ($ conexión); usleep (wait_before_reconnect_us); } Catch (RuntimeException $ E) {CleanUp_Connection ($ Connection); usleep (wait_before_reconnect_us); } Catch (ErrorException $ E) {CleanUp_Connection ($ Connection); usleep (wait_before_reconnect_us); } }
Un ejemplo completo es en demo/connection_recovery_consume.php
.
Este código volverá a conectar y volverá a intentar el código de la aplicación cada vez que ocurra la excepción. Todavía se pueden lanzar algunas excepciones y no deben manejarse como parte del proceso de reconexión, porque pueden ser errores de aplicación.
Este enfoque tiene sentido principalmente para las aplicaciones de consumo, los productores requerirán algún código de aplicación adicional para evitar publicar el mismo mensaje varias veces.
Este fue un ejemplo más simple, en una aplicación de la vida real, es posible que desee controlar el recuento de Retr y tal vez degradar con gracia el tiempo de espera para la reconexión.
Puede encontrar un ejemplo más excesivo en #444
Si ha instalado la extensión PCNTL, el envío de la señal se manejará cuando el consumidor no esté procesando el mensaje.
$ pcntlhandler = function ($ señal) {switch ($ señal) {case sigter: case sigusr1: case sigint: // algunas cosas antes de detener al consumidor, por ejemplo, eliminar el bloqueo etcpcntl_signal ($ señal, sig_dfl); // restaurar handlerPosix_kill (posix_getpid (), $ señal); // matar a sí mismo con señal, ver https://www.cons.org/cracauer/sigint.htmlcase sighUp: // algunas cosas para reiniciar Consumerbreak; predeterminado: // no hacer nada} }; PCNTL_SIGNAL (SigterM, $ pcntlhandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlhandler); pcntl_signal (sighup, $ pcntlhandler);
Para deshabilitar esta característica, solo defina constante AMQP_WITHOUT_SIGNALS
como true
<? Phpdefine ('amqp_without_signals', true); ... más código
Si ha instalado la extensión de PCNTL y está utilizando PHP 7.1 o más, puede registrar un remitente del corazón basado en la señal.
<? php $ sender = new PCNTLHeartBeatsender ($ Connection); $ Sender-> registrar (); ... Código $ Sender-> unregister ();
Si desea saber qué está pasando a nivel de protocolo, agregue la siguiente constante a su código:
<? Phpdefine ('amqp_debug', true); ... ¿Más código?>
Para ejecutar el tipo de referencia de publicación/consumir:
$ Make Benchmark
Consulte contribuyendo para obtener más detalles.
Si aún desea usar la versión anterior del protocolo, puede hacerlo configurando la siguiente constante en su código de configuración:
define ('amqp_protocol', '0.8');
El valor predeterminado es '0.9.1'
.
Si por alguna razón no desea usar el compositor, entonces debe tener un autoloader en su lugar para las clases de la biblioteca. La gente ha informado que usan este autoloader con éxito.
A continuación se muestra el contenido original del archivo ReadMe. Los créditos van a los autores originales.
Biblioteca PHP Implementación del Protocolo de cola de mensajes avanzados (AMQP).
La biblioteca es el código de Python de Py-Amqplib http://barryp.org/software/py-amqplib/
Se ha probado con el servidor RabbitMQ.
Página de inicio del proyecto: http://code.google.com/p/php-amqplib/
Para discusión, únase al grupo:
http://groups.google.com/group/php-amqplib-devel
Para informes de errores, utilice el sistema de seguimiento de errores en la página del proyecto.
¡Los parches son muy bienvenidos!
Autor: vadim zaliva [email protected]