該庫是AMQP 0-9-1協議的純PHP實現。它已經針對RabbitMQ進行了測試。
該圖書館用於行動中的RabbitMQ和官方RabbitMQ教程的PHP例子。
請注意,該項目已使用貢獻者的行為準則發布。通過參加該項目,您同意遵守其條款。
感謝Videlalvaro和PostalService14創建php-amqplib
。
該包現在由RamūnasDronga,Luke Bakken和幾位從事RabbitMQ工作的VMware工程師維護。
從版本2.0開始,該庫默認使用AMQP 0.9.1
,因此需要RabbitMQ 2.0或更高版本。通常,服務器升級不需要任何應用程序代碼更改,因為協議很少更改,但請在升級前進行自己的測試。
由於庫使用AMQP 0.9.1
我們添加了對以下RabbitMQ擴展的支持:
交換綁定
基本nack
出版商確認
消費者取消通知
還支持修改現有方法(例如alternate exchanges
的擴展。
Enqueue/AMQP-LIB是AMQP Interop兼容包裝器。
Amqproxy是一個具有連接和頻道池/重複使用的代理庫。當使用PHP-AMQPLIB時,這允許較低的連接和通道流動,從而減少了CPU的RabbitMQ。
確保您安裝了作曲家,然後運行以下命令:
$作曲家需要php-amqplib/php-amqplib
這將在供應商文件夾中獲取庫及其依賴關係。然後,您可以將以下內容添加到.php文件中以使用庫
require_once __dir __。
然後,您需要use
相關類,例如:
使用phpamqplibConnectionAmqpStreamConnection;使用phpamqplibmessageamqpmessage;
隨著RabbitMQ的運行,打開兩個終端,第一個終端執行以下命令來啟動消費者:
$ CD PHP-AMQPLIB/DEMO $ php amqp_consumer.php
然後在另一個終端上做:
$ CD PHP-AMQPLIB/DEMO $ php amqp_publisher.php一些文本要發布
您應該看到消息到達另一個終端的過程
然後停止消費者,將quit
消息發送到它:
$ php amqp_publisher.php退出
如果您需要收聽用於連接到RabbitMQ的插座,請在非阻止消費者中查看示例。
$ PHP AMQP_CONSUMER_NON_BLOCKING.PHP
有關更多信息,請參閱ChangElog最近發生了什麼變化。
http://php-amqplib.github.io/php-amqplib/
為了不重複自己,如果您想了解有關此圖書館的更多信息,請參考官方的RabbitMQ教程。
amqp_ha_consumer.php
:演示鏡像隊列的使用。
amqp_consumer_exclusive.php
和amqp_publisher_exclusive.php
:使用獨家隊列的演示粉絲交換。
amqp_consumer_fanout_{1,2}.php
和amqp_publisher_fanout.php
:demos fanout fanout交換與命名Queues。
amqp_consumer_pcntl_heartbeat.php
:基於演示信號的心跳發件人的用法。
basic_get.php
:DEMO通過使用基本GET AMQP調用從隊列中獲取消息。
如果您的應用程序可以連接到一個多個節點群,則可以使用一系列主機開始連接。為此,您應該使用create_connection
靜態方法。
例如:
$ connection = amqpstreamConnection :: create_connection([[ ['host'=> host1,'port'=> port,'user'=>用戶,'password'=> pass,'vhost'=> vhost], ['host'=> host2,'port'=>端口,'user'=>用戶,'password'=> pass,'vhost'=> vhost] ],$ options);
此代碼將嘗試首先連接到HOST1
,並在第一個連接失敗時連接到HOST2
。該方法返回第一個成功連接的連接對象。如果所有連接失敗,它將引發最後一次連接嘗試的異常。
有關更多示例,請參見demo/amqp_connect_multiple_hosts.php
。
假設您有一個過程,該過程會生成一堆消息,這些消息將使用相同的routing_key
和Onsporty(例如mandatory
發佈到同一exchange
。然後,您可以使用batch_basic_publish
庫功能。您可以這樣批處理消息:
$ msg = new amqpmessage($ msg_body); $ ch-> batch_basic_publish($ msg,$ exchange); $ msg2 = new amqpmessage($ msg_body); $ ch-> batch_bath_bast_basic_publish($ msg2,$ msg2,$ exchange);
然後發送批次:
$ ch-> prublice_batch();
假設我們的程序需要從文件中讀取,然後每行發布一條消息。根據消息大小,您將不得不決定何時發送批次。您可以每50條或一百個消息發送每50條。這取決於你。
加快消息發布的另一種方法是重複使用AMQPMessage
消息實例。您可以創建這樣的新消息:
$ properties = array('content_type'=>'text/plain','velivery_mode'=> amqpmessage :: drively_mode_persistent); $ msg = new amqpmessage($ ch- hody,$ properties; $ ch--> basic> basic> basic_perprish($ msg,$ msg,$ msg,$交換);
現在,假設您想更改消息主體以獲取未來消息,但您將保留相同的屬性,也就是說,您的消息仍將是text/plain
,並且delivery_mode
仍將是AMQPMessage::DELIVERY_MODE_PERSISTENT
。如果為每個已發布的消息創建一個新的AMQPMessage
實例,則必須以AMQP二進制格式重新編碼這些屬性。您可以通過僅重用AMQPMessage
,然後以這樣的方式重置消息主體來避免所有這些問題:
$ msg-> setBody($ body2); $ ch-> basic_publish($ msg,$ exchange);
AMQP對消息的大小沒有限制;如果消費者收到一條非常大的消息,則可以在調用basic_consume
回調之前,在庫中達到PHP的內存限制。
為避免這種情況,您可以在頻道實例上調用方法AMQPChannel::setBodySizeLimit(int $bytes)
。超過此限制的車身尺寸將被截斷,並通過AMQPMessage::$is_truncated
標誌設置為true
。屬性AMQPMessage::$body_size
將反映接收到的消息的真實身體大小,如果消息已截斷,該消息將高於strlen(AMQPMessage::getBody())
。
請注意,限制上方的所有數據均可從AMQP頻道讀取並立即丟棄,因此無法在您的回調中檢索它。如果您有另一個可以處理具有較大有效載荷的消息的消費者,則可以使用basic_reject
或basic_nack
告訴服務器(仍然具有完整副本)將其轉發到死信交換。
默認情況下,不會發生截斷。要禁用已啟用它的通道上的截斷,請將0
(或null
)傳遞到AMQPChannel::setBodySizeLimit()
。
在網絡錯誤時,一些使用自動連接恢復機制使用自動連接恢復機制來重新連接和恢復頻道和消費者的客戶。
由於該客戶端使用單線程,因此您可以使用異常處理機制來設置連接恢復。
在連接錯誤的情況下可能會拋出的例外:
phpamqplibexceptionAmqpConnectionClosedexceptionphpamqplibexceptionAmqpioExceptionRuntimeExceptionErrorexception
可能會拋出其他一些例外,但是連接仍然可以存在。在重新連接之前處理異常時,清理舊連接總是一個好主意。
例如,如果要設置恢復連接:
$ connection = null; $ channel = null; while(true){try {$ connection = new AmqpStreamConnection(主機,端口,用戶,pass,vhost); // //您的應用程序代碼轉到此處。do_something_with_with_connection( $ connection); } catch(amqpruntimeException $ e){echo $ e-> getMessage(); clearup_connection($ connection); usleep(wait_before_reconnect_us); } catch(runtimeException $ e){callup_connection($ connection); usleep(wait_before_reconnect_us); } catch(errorexception $ e){callup_connection($ connection); usleep(wait_before_reconnect_us); } }
一個完整的示例是在demo/connection_recovery_consume.php
中。
每次發生異常時,此代碼將重新連接並重試應用代碼。一些例外仍然可以被拋出,不應作為重新連接過程的一部分處理,因為它們可能是應用程序錯誤。
這種方法主要對於消費者應用程序是有意義的,生產者將需要一些其他應用程序代碼,以避免多次發布相同的消息。
這是一個最簡單的例子,在現實生活中,您可能需要控制reter計數,並可能優雅地降低等待時間重新連接。
您可以在#444中找到一個更過大的例子
如果您已經安裝了PCNTL擴展,則當消費者不處理消息時,將處理信號。
$ pcntlhandler =函數($ signal){switch($ signal){case sigterm:case sigusr1:case sigint://在停止消費者之前的一些東西,例如delete lock etpepcntl_signal($ signal,sig_dfl); //還原handlerposix_kill(posix_getpid(),$ signal); //用信號殺死自我,請參閱https://www.cons.org/cracauer/sigint.htmlcase sighup://一些可以重新啟動消費者break的東西;默認://什麼nothing}} }; pcntl_signal(sigterm,$ pcntlhandler); pcntl_signal(sigint,$ pcntlhandler); pcntl_signal(sigusr1,$ pcntlhandler); pcntl_signal(sighup,yighup,yighup,$ pcntlhandler);
為了禁用此功能,將常數AMQP_WITHOUT_SIGNALS
定義為true
<?phpdefine('amqp_without_signals',true); ...更多代碼
如果您已經安裝了PCNTL擴展名並使用了PHP 7.1或更高版本,則可以註冊基於信號的心跳發件人。
<?php $ sender = new pcntlheartbeatsender($ connection); $ sender-> regission(); ...代碼$ sender-> unregister();
如果您想知道協議級別上發生了什麼,請在代碼中添加以下常數:
<?phpdefine('amqp_debug',true); ...更多代碼?>
運行出版/消費基準類型:
$製作基準
請有關詳細信息,請參閱貢獻。
如果您仍然想使用該協議的舊版本,則可以通過在配置代碼中設置以下常數來執行此操作:
define('amqp_protocol','0.8');
默認值為'0.9.1'
。
如果由於某種原因您不想使用作曲家,則需要在圖書館類中安裝自動加載器。人們據報導,將這種自動加載器成功使用。
以下是原始的讀數文件內容。學分是原始作者。
PHP庫實施高級消息排隊協議(AMQP)。
該庫是Py-Amqplib的Python代碼港口http://barryp.org/software/py-amqplib/
它已經使用RabbitMQ服務器進行了測試。
項目主頁:http://code.google.com/p/php-amqplib/
進行討論,請加入小組:
http://groups.google.com/group/php-amqplib-devel
對於錯誤報告,請在項目頁面上使用錯誤跟踪系統。
補丁非常歡迎!
作者:vadim zaliva [email protected]