该库是AMQP 0-9-1协议的纯PHP实现。它已经针对RabbitMQ进行了测试。
该图书馆用于行动中的RabbitMQ和官方RabbitMQ教程的PHP例子。
请注意,该项目已使用贡献者的行为准则发布。通过参加该项目,您同意遵守其条款。
感谢Videlalvaro和PostalService14创建php-amqplib
。
该包现在由RamūnasDronga,Luke Bakken和几位从事RabbitMQ工作的VMware工程师维护。
从版本2.0开始,该库默认使用AMQP 0.9.1
,因此需要RabbitMQ 2.0或更高版本。通常,服务器升级不需要任何应用程序代码更改,因为协议很少更改,但请在升级前进行自己的测试。
由于库使用AMQP 0.9.1
我们添加了对以下RabbitMQ扩展的支持:
交换绑定
基本nack
出版商确认
消费者取消通知
还支持修改现有方法(例如alternate exchanges
的扩展。
Enqueue/AMQP-LIB是AMQP Interop兼容包装器。
Amqproxy是一个具有连接和频道池/重复使用的代理库。当使用PHP-AMQPLIB时,这允许较低的连接和通道流动,从而减少了CPU的RabbitMQ。
确保您安装了作曲家,然后运行以下命令:
$作曲家需要php-amqplib/php-amqplib
这将在供应商文件夹中获取库及其依赖关系。然后,您可以将以下内容添加到.php文件中以使用库
require_once __dir __。
然后,您需要use
相关类,例如:
使用phpamqplibConnectionAmqpStreamConnection;使用phpamqplibmessageamqpmessage;
随着RabbitMQ的运行,打开两个终端,第一个终端执行以下命令来启动消费者:
$ CD PHP-AMQPLIB/DEMO $ php amqp_consumer.php
然后在另一个终端上做:
$ CD PHP-AMQPLIB/DEMO $ php amqp_publisher.php一些文本要发布
您应该看到消息到达另一个终端的过程
然后停止消费者,将quit
消息发送到它:
$ php amqp_publisher.php退出
如果您需要收听用于连接到RabbitMQ的插座,请在非阻止消费者中查看示例。
$ PHP AMQP_CONSUMER_NON_BLOCKING.PHP
有关更多信息,请参阅ChangElog最近发生了什么变化。
http://php-amqplib.github.io/php-amqplib/
为了不重复自己,如果您想了解有关此图书馆的更多信息,请参考官方的RabbitMQ教程。
amqp_ha_consumer.php
:演示镜像队列的使用。
amqp_consumer_exclusive.php
和amqp_publisher_exclusive.php
:使用独家队列的演示粉丝交换。
amqp_consumer_fanout_{1,2}.php
和amqp_publisher_fanout.php
:demos fanout fanout交换与命名Queues。
amqp_consumer_pcntl_heartbeat.php
:基于演示信号的心跳发件人的用法。
basic_get.php
:DEMO通过使用基本GET AMQP调用从队列中获取消息。
如果您的应用程序可以连接到一个多个节点群,则可以使用一系列主机开始连接。为此,您应该使用create_connection
静态方法。
例如:
$ connection = amqpstreamConnection :: create_connection([[ ['host'=> host1,'port'=> port,'user'=>用户,'password'=> pass,'vhost'=> vhost], ['host'=> host2,'port'=>端口,'user'=>用户,'password'=> pass,'vhost'=> vhost] ],$ options);
此代码将尝试首先连接到HOST1
,并在第一个连接失败时连接到HOST2
。该方法返回第一个成功连接的连接对象。如果所有连接失败,它将引发最后一次连接尝试的异常。
有关更多示例,请参见demo/amqp_connect_multiple_hosts.php
。
假设您有一个过程,该过程会生成一堆消息,这些消息将使用相同的routing_key
和Onsporty(例如mandatory
发布到同一exchange
。然后,您可以使用batch_basic_publish
库功能。您可以这样批处理消息:
$ msg = new amqpmessage($ msg_body); $ ch-> batch_basic_publish($ msg,$ exchange); $ msg2 = new amqpmessage($ msg_body); $ ch-> batch_bath_bast_basic_publish($ msg2,$ msg2,$ exchange);
然后发送批次:
$ ch-> prublice_batch();
假设我们的程序需要从文件中读取,然后每行发布一条消息。根据消息大小,您将不得不决定何时发送批次。您可以每50条或一百个消息发送每50条。这取决于你。
加快消息发布的另一种方法是重复使用AMQPMessage
消息实例。您可以创建这样的新消息:
$ properties = array('content_type'=>'text/plain','velivery_mode'=> amqpmessage :: drively_mode_persistent); $ msg = new amqpmessage($ ch- hody,$ properties; $ ch--> basic> basic> basic_perprish($ msg,$ msg,$ msg,$交换);
现在,假设您想更改消息主体以获取未来消息,但您将保留相同的属性,也就是说,您的消息仍将是text/plain
,并且delivery_mode
仍将是AMQPMessage::DELIVERY_MODE_PERSISTENT
。如果为每个已发布的消息创建一个新的AMQPMessage
实例,则必须以AMQP二进制格式重新编码这些属性。您可以通过仅重用AMQPMessage
,然后以这样的方式重置消息主体来避免所有这些问题:
$ msg-> setBody($ body2); $ ch-> basic_publish($ msg,$ exchange);
AMQP对消息的大小没有限制;如果消费者收到一条非常大的消息,则可以在调用basic_consume
回调之前,在库中达到PHP的内存限制。
为避免这种情况,您可以在频道实例上调用方法AMQPChannel::setBodySizeLimit(int $bytes)
。超过此限制的车身尺寸将被截断,并通过AMQPMessage::$is_truncated
标志设置为true
。属性AMQPMessage::$body_size
将反映接收到的消息的真实身体大小,如果消息已截断,该消息将高于strlen(AMQPMessage::getBody())
。
请注意,限制上方的所有数据均可从AMQP频道读取并立即丢弃,因此无法在您的回调中检索它。如果您有另一个可以处理具有较大有效载荷的消息的消费者,则可以使用basic_reject
或basic_nack
告诉服务器(仍然具有完整副本)将其转发到死信交换。
默认情况下,不会发生截断。要禁用已启用它的通道上的截断,请将0
(或null
)传递到AMQPChannel::setBodySizeLimit()
。
在网络错误时,一些使用自动连接恢复机制使用自动连接恢复机制来重新连接和恢复频道和消费者的客户。
由于该客户端使用单线程,因此您可以使用异常处理机制来设置连接恢复。
在连接错误的情况下可能会抛出的例外:
phpamqplibexceptionAmqpConnectionClosedexceptionphpamqplibexceptionAmqpioExceptionRuntimeExceptionErrorexception
可能会抛出其他一些例外,但是连接仍然可以存在。在重新连接之前处理异常时,清理旧连接总是一个好主意。
例如,如果要设置恢复连接:
$ connection = null; $ channel = null; while(true){try {$ connection = new AmqpStreamConnection(主机,端口,用户,pass,vhost); // //您的应用程序代码转到此处。do_something_with_with_connection($ connection); } catch(amqpruntimeException $ e){echo $ e-> getMessage(); clearup_connection($ connection); usleep(wait_before_reconnect_us); } catch(runtimeException $ e){callup_connection($ connection); usleep(wait_before_reconnect_us); } catch(errorexception $ e){callup_connection($ connection); usleep(wait_before_reconnect_us); } }
一个完整的示例是在demo/connection_recovery_consume.php
中。
每次发生异常时,此代码将重新连接并重试应用代码。一些例外仍然可以被抛出,不应作为重新连接过程的一部分处理,因为它们可能是应用程序错误。
这种方法主要对于消费者应用程序是有意义的,生产者将需要一些其他应用程序代码,以避免多次发布相同的消息。
这是一个最简单的例子,在现实生活中,您可能需要控制reter计数,并可能优雅地降低等待时间重新连接。
您可以在#444中找到一个更过大的例子
如果您已经安装了PCNTL扩展,则当消费者不处理消息时,将处理信号。
$ pcntlhandler =函数($ signal){switch($ signal){case sigterm:case sigusr1:case sigint://在停止消费者之前的一些东西,例如delete lock etpepcntl_signal($ signal,sig_dfl); //还原handlerposix_kill(posix_getpid(),$ signal); //用信号杀死自我,请参阅https://www.cons.org/cracauer/sigint.htmlcase sighup://一些可以重新启动消费者break的东西;默认://什么nothing}} }; pcntl_signal(sigterm,$ pcntlhandler); pcntl_signal(sigint,$ pcntlhandler); pcntl_signal(sigusr1,$ pcntlhandler); pcntl_signal(sighup,yighup,yighup,$ pcntlhandler);
为了禁用此功能,将常数AMQP_WITHOUT_SIGNALS
定义为true
<?phpdefine('amqp_without_signals',true); ...更多代码
如果您已经安装了PCNTL扩展名并使用了PHP 7.1或更高版本,则可以注册基于信号的心跳发件人。
<?php $ sender = new pcntlheartbeatsender($ connection); $ sender-> regission(); ...代码$ sender-> unregister();
如果您想知道协议级别上发生了什么,请在代码中添加以下常数:
<?phpdefine('amqp_debug',true); ...更多代码?>
运行出版/消费基准类型:
$制作基准
请有关详细信息,请参阅贡献。
如果您仍然想使用该协议的旧版本,则可以通过在配置代码中设置以下常数来执行此操作:
define('amqp_protocol','0.8');
默认值为'0.9.1'
。
如果由于某种原因您不想使用作曲家,则需要在图书馆类中安装自动加载器。人们据报道,将这种自动加载器成功使用。
以下是原始的读数文件内容。学分是原始作者。
PHP库实施高级消息排队协议(AMQP)。
该库是Py-Amqplib的Python代码港口http://barryp.org/software/py-amqplib/
它已经使用RabbitMQ服务器进行了测试。
项目主页:http://code.google.com/p/php-amqplib/
进行讨论,请加入小组:
http://groups.google.com/group/php-amqplib-devel
对于错误报告,请在项目页面上使用错误跟踪系统。
补丁非常欢迎!
作者:vadim zaliva [email protected]