Эта библиотека является чистой реализацией PHP протокола AMQP 0-9-1. Он был проверен против Rabbitmq.
Библиотека использовалась для примеров PHP RabbitMQ в действии и официальных учебных пособий RabbitMQ.
Обратите внимание, что этот проект выпущен с кодексом поведения участника. Участвуя в этом проекте, вы соглашаетесь соблюдать его условия.
Благодаря Videlalvaro и Postalservice14 за создание php-amqplib
.
В настоящее время пакет поддерживается Рамунасом Дронга, Люком Баккеном и несколькими инженерами VMware, работающими над Rabbitmq.
Начиная с версии 2.0. Эта библиотека использует AMQP 0.9.1
по умолчанию и, таким образом, требует версии Rabbitmq 2.0 или более поздней. Обычно обновления сервера не требуют никаких изменений кода приложения, поскольку протокол меняется очень редко, но, пожалуйста, проведите собственное тестирование перед обновлением.
Поскольку библиотека использует AMQP 0.9.1
мы добавили поддержку для следующих расширений RabbitMQ:
Обмен на обмен привязками
Основное наступление
Издатель подтверждает
Потребитель отменил уведомление
Расширения, которые изменяют существующие методы, такие как alternate exchanges
также поддерживаются.
Enqueue/Amqp-Lib-это объединяемая обертка AMQP.
AMQProxy - это прокси -библиотека с соединением и объединением/повторным использованием каналов. Это обеспечивает более низкое соединение и отток канала при использовании PHP-AMQPlib, что приводит к меньшему использованию ЦП RabbitMQ.
Убедитесь, что у вас установлен композитор, затем запустите следующую команду:
$ Composer требует PHP-AMQPLIB/PHP-AMQPLIB
Это принесет библиотеку и ее зависимости в папке поставщика. Затем вы можете добавить следующее в ваши файлы .php, чтобы использовать библиотеку
require_once __dir __. '/vendor/autoload.php';
Затем вам нужно use
соответствующие классы, например:
Используйте phpamqplibconnectionamqpstreamconnection; используйте phpamqplibmessageamqpmessage;
С запуском Rabbitmq открывается два терминала и на первом выполнении следующих команд, чтобы запустить потребителя:
$ cd php-amqplib/demo $ php amqp_consumer.php
Тогда на другом терминале DO:
$ cd php-amqplib/demo $ php amqp_publisher.php немного текста для публикации
Вы должны увидеть сообщение, поступающее в процесс на другом терминале
Затем, чтобы остановить потребителя, отправьте ему сообщение quit
:
$ php amqp_publisher.php quit
Если вам нужно прослушать розетки, используемые для подключения к RabbitMQ, см. Пример в потребителе, не связанном с блокированием.
$ php amqp_consumer_non_blocking.php
Пожалуйста, смотрите ChangeLog для получения дополнительной информации, что изменилось в последнее время.
http://php-amqplib.github.io/php-amqplib/
Чтобы не повторяться, если вы хотите узнать больше об этой библиотеке, пожалуйста, обратитесь к официальным учебным пособиям RabbitMQ.
amqp_ha_consumer.php
: демо. Использование зеркальных очередей.
amqp_consumer_exclusive.php
и amqp_publisher_exclusive.php
: DEMOS FANOUT FANOUT FANOUT с использованием эксклюзивных очередей.
amqp_consumer_fanout_{1,2}.php
и amqp_publisher_fanout.php
: Demos Fanout Bearch с названными очередями.
amqp_consumer_pcntl_heartbeat.php
: DEMOS Использование отправителя сердца на основе сигналов.
basic_get.php
: демо, получая сообщения из очередей с использованием базового вызова AMQP.
Если у вас есть кластер нескольких узлов, к которым может подключаться ваше приложение, вы можете запустить соединение с массивом хостов. Для этого вы должны использовать статический метод create_connection
.
Например:
$ connection = amqpstreamconnection :: create_connection ([[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[ ['host' => host1, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost], ['host' => host2, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost] ], $ options);
Этот код будет попытаться сначала подключиться к HOST1
и подключиться к HOST2
если первое соединение не удалось. Метод возвращает объект соединения для первого успешного соединения. Если все подключения не пройдут, это выбросит исключение из последней попытки соединения.
См. demo/amqp_connect_multiple_hosts.php
для получения дополнительных примеров.
Допустим, у вас есть процесс, который генерирует кучу сообщений, которые будут опубликованы на одном и том же exchange
с использованием того же самого routing_key
и опций, таких как mandatory
. Затем вы можете использовать функцию библиотеки batch_basic_publish
. Вы можете собирать такие сообщения, как это:
$ msg = new amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ Exchange); $ msg2 = new amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ earch);
а затем отправьте партию как это:
$ ch-> publish_batch ();
Допустим, наша программа должна читать из файла, а затем опубликовать одно сообщение на строку. В зависимости от размера сообщения, вам придется решить, когда лучше отправить партию. Вы можете отправлять его каждые 50 сообщений или каждые сто. Это зависит от вас.
Еще один способ ускорить публикацию вашего сообщения - повторно использовать экземпляры сообщения AMQPMessage
. Вы можете создать свое новое сообщение таким образом:
$ properties = array ('content_type' => 'text/plain', 'deliver_mode' => amqpmessage :: delivery_mode_persistent); $ msg = new amqpmessage ($ body, $ properties); $ ch-> basic_publish ($ msg, $ обмен);
Теперь допустим, что, хотя вы хотите изменить тело сообщения для будущих сообщений, вы будете сохранять такие же свойства, то есть ваши сообщения все еще будут text/plain
, а delivery_mode
все равно будет AMQPMessage::DELIVERY_MODE_PERSISTENT
. Если вы создаете новый экземпляр AMQPMessage
для каждого опубликованного сообщения, то эти свойства должны быть перекодированы в двоичном формате AMQP. Вы можете избежать всего этого, просто повторно используя AMQPMessage
, а затем сбросить тело сообщения так:
$ msg-> setbody ($ body2); $ ch-> basic_publish ($ msg, $ Exchange);
AMQP не налагает ограничения на размер сообщений; Если потребитель получает очень большое сообщение, ограничение памяти PHP может быть достигнуто в библиотеке до того, как будет вызван обратный вызов в basic_consume
.
Чтобы избежать этого, вы можете вызвать метод AMQPChannel::setBodySizeLimit(int $bytes)
в экземпляре канала. Размеры тела, превышающие этот лимит, будут усечены и доставлены на ваш обратный вызов с помощью флага AMQPMessage::$is_truncated
установленное на true
. Свойство AMQPMessage::$body_size
будет отражать истинный размер тела полученного сообщения, который будет выше, чем strlen(AMQPMessage::getBody())
если сообщение было усечено.
Обратите внимание, что все данные выше предела считываются с канала AMQP и сразу же отброшены, поэтому нет возможности получить их в рамках вашего обратного вызова. Если у вас есть другой потребитель, который может обрабатывать сообщения с большими полезными нагрузками, вы можете использовать basic_reject
или basic_nack
, чтобы сообщить серверу (у которого все еще есть полная копия), чтобы перенаправить его в обмен мертвыми буквами.
По умолчанию не произойдет усечение. Чтобы отключить усечение на канале, который включил его, передайте 0
(или null
) в AMQPChannel::setBodySizeLimit()
.
Некоторые клиенты RabbitMQ, использующие автоматические механизмы восстановления соединения для восстановления и восстановления каналов и потребителей в случае сетевых ошибок.
Поскольку этот клиент использует однопоточную, вы можете настроить восстановление подключения, используя механизм обработки исключений.
Исключения, которые могут быть брошены в случае ошибок соединения:
PhpamqplibexceptionAmqpConnectionClosExceptionphpamqplibexceptionAmqPioExceptionRuntImeExceptionErrorexception
Некоторые другие исключения могут быть брошены, но соединение все еще может быть там. Это всегда хорошая идея, чтобы очистить старое соединение при обращении с исключением, прежде чем воссоединиться.
Например, если вы хотите настроить восстанавливающее соединение:
$ connection = null; $ channel = null; while (true) {try {$ connection = new amqpstreamconnection (host, port, user, pass, vhost); // код вашего приложения идет здесь. do_something_with_connection ($ connection); } catch (amqpruntimeexception $ e) {echo $ e-> getMessage (); cleanup_connection ($ connection); usle (wait_before_reconnect_us); } catch (runtimeexception $ e) {cleanup_connection ($ connection); unsleep (wait_before_reconnect_us); } catch (errorexception $ e) {cleanup_connection ($ connection); usle (wait_before_reconnect_us); } }
Полный пример - demo/connection_recovery_consume.php
.
Этот код воссоединит и повторно повторит код приложения каждый раз, когда происходит исключение. Некоторые исключения по -прежнему могут быть брошены и не следует обрабатывать как часть процесса повторного соединения, потому что они могут быть ошибками применения.
Этот подход имеет смысл в основном для потребительских приложений, производителям потребуется дополнительный код приложения, чтобы избежать публикации одного и того же сообщения несколько раз.
Это был самый простой пример, в реальном приложении, которое вы можете контролировать, и, возможно, изящно ухудшить время ожидания для повторного подключения.
Вы можете найти более чрезмерный пример в #444
Если вы установили расширение PCNTL, диспетчерирование сигнала будет обрабатываться, когда потребитель не обрабатывает сообщение.
$ pcntlhandler = function ($ signal) {switch ($ signal) {case sigterm: case sigusr1: case sigint: // Некоторые вещи перед остановкой потребителя, например, удаление блокировки и т. Д. // восстановить handlerposix_kill (posix_getpid (), $ signal); // Убивайте себя с сигналом, см. https://www.cons.org/cracauer/sigint.htmlcase sighup: // Некоторые вещи для перезапуска ConsumerBreak; default: // ничего не делайте} }; pcntl_signal (sigterm, $ pcntlhandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlhandler); pcntl_signal (sighup, $ pcntlhandler);
Чтобы отключить эту функцию просто определить постоянную AMQP_WITHOUT_SIGNALS
как true
<? phpdefine ('amqp_without_signals', true); ... больше кода
Если вы установили расширение PCNTL и используете PHP 7.1 или более, вы можете зарегистрировать отправитель HeartBeat на основе сигнала.
<? php $ sender = new pcntlheartbeatsender ($ connection); $ sender-> Register (); ... код $ sender-> unregister ();
Если вы хотите знать, что происходит на уровне протокола, добавьте следующую постоянную в свой код:
<? phpdefine ('amqp_debug', true); ... больше кода?>
Чтобы запустить тип Publishing/Pully Benchmark:
$ Make Tenchmark
Пожалуйста, смотрите для получения подробной информации.
Если вы все еще хотите использовать старую версию протокола, вы можете сделать это, установив следующую постоянную в коде конфигурации:
определить ('amqp_protocol', '0,8');
Значение по умолчанию составляет '0.9.1'
.
Если по какой -то причине вы не хотите использовать композитор, вам нужно иметь автозагрузчик для классов библиотеки. Люди сообщают, что с успехом используют этот автозазагрузчик.
Ниже приведено исходное содержимое файла README. Кредиты доставляются оригинальным авторам.
Библиотека PHP реализует расширенный протокол очереди сообщений (AMQP).
Библиотека является портом Python Code Py-Amqplib http://barryp.org/software/py-amqplib/
Он был протестирован с помощью сервера RabbitMQ.
Домашняя страница проекта: http://code.google.com/p/php-amqplib/
Для обсуждения, пожалуйста, присоединяйтесь к группе:
http://groups.google.com/group/php-amqplib-devel
Для отчетов об ошибках, пожалуйста, используйте систему отслеживания ошибок на странице проекта.
Патчи очень приветствуются!
Автор: vadim Zaliva [email protected]