このライブラリは、AMQP 0-9-1プロトコルの純粋なPHP実装です。 Rabbitmqに対してテストされています。
ライブラリは、RabbitMQのPHP例と公式のRabbitMQチュートリアルに使用されました。
このプロジェクトは、貢献者行動規範でリリースされていることに注意してください。このプロジェクトに参加することにより、その条件を順守することに同意します。
php-amqplib
作成してくれたVidelalvaroとPostalservice14に感謝します。
このパッケージは現在、RamūnasDronga、Luke Bakken、およびRabbitmqに取り組んでいるいくつかのVMwareエンジニアによって維持されています。
バージョン2.0から始めて、このライブラリはデフォルトでAMQP 0.9.1
使用するため、RabbitMQ 2.0以降のバージョンが必要です。通常、サーバーのアップグレードは、プロトコルが非常にまれに変更されるため、アプリケーションコードの変更を必要としませんが、アップグレードする前に独自のテストを実施してください。
ライブラリはAMQP 0.9.1
使用しているため、次のRabbitMQ拡張機能のサポートを追加しました。
バインディングを交換するために交換します
基本的なナック
出版社は確認します
消費者は通知をキャンセルします
alternate exchanges
などの既存のメソッドを変更する拡張機能もサポートされています。
Enqueue/amqp-libは、AMQPインターナップ互換ラッパーです。
AmqProxyは、接続とチャネルのプーリング/再利用を備えたプロキシライブラリです。これにより、PHP-AMQPLIBを使用すると、接続とチャネルの解約が低下し、RabbitMQのCPU使用量が少なくなります。
作曲家がインストールされていることを確認してから、次のコマンドを実行します。
$ Composerには、PHP-AMQPLIB/PHP-AMQPLIBが必要です
これにより、ベンダーフォルダー内のライブラリとその依存関係が取得されます。次に、ライブラリを使用するために.phpファイルに次のことを追加できます
require_once __dir __。 '/vendor/autoload.php';
次に、関連するクラスuse
必要があります。
phpamqplibconnectionAmqptStreamConnectionを使用します。PHPAMQPLIBMESSAGEAMQPMESSAGEを使用します。
RabbitMQが2つの端末を開いており、最初の端末で次のコマンドを実行して消費者を開始します。
$ CD PHP-AMQPLIB/DEMO $ php amqp_consumer.php
次に、他の端末で:
$ CD PHP-AMQPLIB/DEMO $ php amqp_publisher.php公開するテキスト
他の端末のプロセスに到着するメッセージが表示されます
その後、消費者を止めるには、 quit
メッセージを送信してください。
$ php amqp_publisher.php quit
RabbitMQに接続するために使用されるソケットを聴く必要がある場合は、非ブロック消費者の例を参照してください。
$ php amqp_consumer_non_blocking.php
最近変更されたものについては、Changelogをご覧ください。
http://php-amqplib.github.io/php-amqplib/
自分自身を繰り返さないように、このライブラリについて詳しく知りたい場合は、公式のrabbitmqチュートリアルを参照してください。
amqp_ha_consumer.php
:ミラー化されたキューの使用をデモします。
amqp_consumer_exclusive.php
およびamqp_publisher_exclusive.php
:排他的なキューを使用したデモファンアウト交換。
amqp_consumer_fanout_{1,2}.php
and amqp_publisher_fanout.php
:名前付きキューとのデモファンアウト交換。
amqp_consumer_pcntl_heartbeat.php
:デモ信号ベースのハートビート送信者の使用。
basic_get.php
: basic get amqp呼び出しを使用して、キューからメッセージを取得するデモ。
アプリケーションが接続できる複数のノードのクラスターがある場合は、ホストの配列との接続を開始できます。そのためには、 create_connection
staticメソッドを使用する必要があります。
例えば:
$ connection = amqpstreamconnection :: create_connection([[ ['host' => host1、 'port' => port、 'user' => user、 'password' => pass、 'vhost' => vhost]、 ['host' => host2、 'port' => port、 'user' => user、 'password' => pass、 'vhost' => vhost] ]、$ options);
このコードは、最初にHOST1
に接続し、最初の接続が失敗した場合はHOST2
に接続しようとします。このメソッドは、最初の成功した接続の接続オブジェクトを返します。すべての接続が失敗した場合、最後の接続試行から例外がスローされます。
その他の例については、 demo/amqp_connect_multiple_hosts.php
を参照してください。
同じrouting_key
を使用して同じexchange
に公開されるメッセージとmandatory
のようなオプションを生成するプロセスがあるとします。その後、 batch_basic_publish
ライブラリ機能を使用できます。このようなメッセージをバッチすることができます:
$ msg = new amqpmessage($ msg_body); $ ch-> batch_basic_publish($ msg、$ exchange); $ msg2 = new amqpmessage($ msg_body); $ ch-> batch_basic_publish($ msg2、$ exchange);
そして、このようなバッチを送信します:
$ ch-> publish_batch();
プログラムがファイルから読み取り、1行に1つのメッセージを公開する必要があるとしましょう。メッセージのサイズに応じて、バッチを送信する方が良い場合を決定する必要があります。 50個のメッセージごと、または100個ごとに送信できます。それはあなた次第です。
メッセージの公開をスピードアップする別の方法は、 AMQPMessage
メッセージインスタンスを再利用することです。このような新しいメッセージを作成できます。
$ properties = array( 'content_type' => 'text/plain'、 'delivery_mode' => amqpmessage :: delivery_mode_persistent); $ msg = new amqpmessage($ body、$ properties); $ ch-> basic_publish($ msg、$ $交換);
ここで、将来のメッセージに対してメッセージ本文を変更したい間、同じプロパティを保持します。つまり、メッセージは引き続きtext/plain
であり、 delivery_mode
まだAMQPMessage::DELIVERY_MODE_PERSISTENT
になります。公開されているすべてのメッセージに対して新しいAMQPMessage
インスタンスを作成する場合、それらのプロパティはAMQPバイナリ形式で再エンコードする必要があります。 AMQPMessage
再利用して、このようにメッセージ本文をリセットするだけで、すべてを避けることができます。
$ msg-> setbody($ body2); $ ch-> basic_publish($ msg、$ exchange);
AMQPは、メッセージのサイズに制限を課さない。消費者が非常に大きなメッセージを受信した場合、Callbackがbasic_consume
に渡される前に、ライブラリ内でPHPのメモリ制限に到達することができます。
これを回避するために、チャンネルインスタンスのメソッドAMQPChannel::setBodySizeLimit(int $bytes)
を呼び出すことができます。この制限を超えるボディサイズは切り捨てられ、 AMQPMessage::$is_truncated
フラグをtrue
に設定してコールバックに配信されます。プロパティAMQPMessage::$body_size
受信したメッセージの真のボディサイズを反映します。これは、メッセージが切り捨てられている場合strlen(AMQPMessage::getBody())
よりも高くなります。
制限を超えるすべてのデータはAMQPチャネルから読み取り、すぐに破棄されるため、コールバック内でそれを取得する方法はありません。大きなペイロードでメッセージを処理できる別の消費者がいる場合は、 basic_reject
またはbasic_nack
使用してサーバー(完全なコピーがある)に指示して、死んだ文字交換に転送できます。
デフォルトでは、切り捨ては発生しません。有効になったチャネルで切り捨てを無効にするには、 0
(またはnull
)をAMQPChannel::setBodySizeLimit()
に渡します。
自動接続回復メカニズムを使用して、ネットワークエラーが発生した場合にチャネルと消費者を再接続および回復する一部のRabbitMQクライアント。
このクライアントはシングルスレッドを使用しているため、例外処理メカニズムを使用して接続回復をセットアップできます。
接続エラーの場合にスローされる可能性のある例外:
phpamqplibexceptionAmQConnectionClosedExceptionPlibexceptionAmQPioExceptionRuntimeexceptionErrorexception
他のいくつかの例外がスローされる可能性がありますが、接続はまだそこにある場合があります。再接続する前に例外を処理するときに、古い接続をクリーンアップすることをお勧めします。
たとえば、回復接続を設定する場合:
$ connection = null; $ channel = null; while(true){try {$ connection = new amqpstreamConnection(host、port、user、pass、vhost); //アプリケーションコードはここに掲載されます。 } catch(amqpruntimeexception $ e){echo $ e-> getMessage(); cleanup_connection($ connection); usleep(wait_before_reconnect_us); } catch(runtimeexception $ e){cleanup_connection($ connection); usleep(wait_before_reconnect_us); } catch(errorexception $ e){cleanup_connection($ connection); usleep(wait_before_reconnect_us); } }
完全な例は、 demo/connection_recovery_consume.php
にあります。
このコードは、例外が発生するたびにアプリケーションコードを再接続および再試行します。いくつかの例外はまだスローされる可能性があり、アプリケーションエラーである可能性があるため、再接続プロセスの一部として処理されるべきではありません。
このアプローチは、主に消費者アプリケーションにとって理にかなっています。プロデューサーは、同じメッセージを複数回公開しないように追加のアプリケーションコードを必要とします。
これは最も単純な例でした。実際のアプリケーションでは、RETカウントを制御し、再接続までの待ち時間を優雅に劣化させる可能性があります。
#444で、より過剰な例を見つけることができます
消費者がメッセージを処理していないときに、PCNTL拡張機能ディスパッチのインストールをインストールした場合、信号のディスパッチは処理されます。
$ pcntlhandler = function($ signal){switch($ signal){case sigterm:case sigusr1:case sigint://停止する前のいくつかのものEg delete lock etcpcntl_signal($ signal、sig_dfl); // Handlerposix_killを復元(posix_getpid()、$ signal); //信号で自己を殺す、https://www.cons.org/cracauer/sigint.htmlcase sighup://消費者ブレークを再起動するもの;デフォルト://何もしない}を参照してください。 }; pcntl_signal(sigterm、$ pcntlhandler); pcntl_signal(sigint、$ pcntlhandler); pcntl_signal(sigusr1、$ pcntlhandler); pcntl_signal(sighup、$ pcntlhandler);
この機能を無効にするには、定数AMQP_WITHOUT_SIGNALS
true
として定義するだけです
<?phpdefine( 'amqp_without_signals'、true); ...詳細コード
PCNTL拡張機能をインストールし、PHP 7.1以降を使用している場合は、信号ベースのハートビート送信者を登録できます。
<?php $ sender = new pcntlheartbeatsender($ connection); $ sender-> register(); ... code $ sender-> unregister();
プロトコルレベルで何が起こっているのか知りたい場合は、次の定数をコードに追加します。
<?phpdefine( 'amqp_debug'、true); ...詳細コード?>
パブリッシング/消費ベンチマークタイプを実行するには:
$ベンチマークを作成します
詳細については、寄付をご覧ください。
プロトコルの古いバージョンを使用したい場合は、構成コードで次の定数を設定することでそれを行うことができます。
定義( 'amqp_protocol'、 '0.8');
デフォルト値は'0.9.1'
です。
何らかの理由でComposerを使用したくない場合は、ライブラリクラスにAutoloaderを配置する必要があります。人々はこのオートローダーを成功させて使用することを報告しています。
以下は、元のREADMEファイルコンテンツです。クレジットは元の著者に送られます。
高度なメッセージキューイングプロトコル(AMQP)を実装するPHPライブラリ。
ライブラリは、Py-Amqplib http://barryp.org/software/py-amqplib/のPythonコードのポートです
RabbitMQサーバーでテストされています。
プロジェクトのホームページ:http://code.google.com/p/php-amqplib/
ディスカッションについては、グループに参加してください。
http://groups.google.com/group/php-amqplib-devel
バグレポートについては、プロジェクトページでバグ追跡システムを使用してください。
パッチは大歓迎です!
著者:Vadim Zaliva [email protected]