PHP-rdkafka は、librdkafka に基づいた安定した、本番環境に対応した、高速なPHP 用の Kafka クライアントです。
現在のバージョンは、PHP >= 8.1.0、librdkafka >= 1.5.3、Kafka >= 0.8 をサポートしています。バージョン 6.x は、PHP 7.x..8.x、librdkafka 0.11..2.x をサポートします。古いバージョンは PHP 5 をサポートしています。
拡張機能の目標は、運用と長期サポートに焦点を当てた、意見のない低レベルの librdkafka バインディングになることです。
高レベルおよび低レベルのコンシューマ API 、プロデューサ API 、およびメタデータAPI がサポートされています。
ドキュメントはここから入手できます。
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
以下で使用される構成パラメータは、Librdkafka 構成リファレンスにあります。
プロデュースするには、まずプロデューサーを作成し、それにブローカー (Kafka サーバー) を追加する必要があります。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
警告メッセージが失われないように、プロデューサが適切なシャットダウン (以下を参照) に従っていることを確認してください。
次に、プロデューサーからトピック インスタンスを作成します。
<?php
$ topic = $ rk -> newTopic ( " test " );
そこから、Produce メソッドを使用して、必要なだけメッセージを生成できます。
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
最初の引数はパーティションです。 RD_KAFKA_PARTITION_UA はunassignedを表し、librdkafka にパーティションを選択させます。
2 番目の引数はメッセージ フラグであり、0 のいずれかである必要があります。
またはRD_KAFKA_MSG_F_BLOCK
て、キューがいっぱいになったときに生成物をブロックします。メッセージ ペイロードは何でも構いません。
これは、プロデューサー インスタンスを破棄する前に行う必要があります。
キューに入れられたすべての実行中のプロデュースリクエストが完了していることを確認するため
終了する前に。 $timeout_ms
には適切な値を使用してください。
警告flash を呼び出さないと、メッセージが失われる可能性があります。
$ rk -> flush ( $ timeout_ms );
まだ送信されていないメッセージの送信を気にしない場合は、 flush()
呼び出す前にpurge()
使用できます。
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
RdKafkaKafkaConsumer クラスは、自動パーティション割り当て/取り消しをサポートします。ここの例を参照してください。
注:低レベルのコンシューマはレガシー API です。高レベルのコンシューマの使用を優先してください。
まず低レベルのコンシューマーを作成し、それにブローカー (Kafka サーバー) を追加する必要があります。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
次に、 newTopic()
メソッドを呼び出してトピック インスタンスを作成し、パーティション 0 での使用を開始します。
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
次に、消費されたメッセージを取得します。
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
注:低レベルのコンシューマはレガシー API です。高レベルのコンシューマの使用を優先してください。
複数のトピックやパーティションからの消費は、これらのトピックやパーティションからのすべてのメッセージを内部キューに転送するように librdkafka に指示し、このキューから消費することで実行できます。
キューの作成:
<?php
$ queue = $ rk -> newQueue ();
トピック パーティションをキューに追加します。
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
次に、消費されたメッセージをキューから取得します。
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka はデフォルトでオフセットをブローカーに保存します。
オフセット ストレージにローカル ファイルを使用している場合、デフォルトでは、ファイルはトピックとパーティションに基づいた名前で現在のディレクトリに作成されます。ディレクトリは、 offset.store.path
構成プロパティを設定することで変更できます。
オフセットを手動で制御するには、 enable.auto.offset.store
false
に設定します。
auto.commit.interval.ms
およびauto.commit.enable
設定によって制御されます。
保存されたオフセットがブローカーに自動的にコミットされるかどうか、およびその間隔。
オフセットを手動で制御するには、 enable.auto.commit
false
に設定します。
高レベルのコンシューマのメッセージを消費するための呼び出し間の最大許容時間。
この間隔を超えると、コンシューマは失敗したとみなされ、グループは
パーティションを別のコンシューマ・グループ・メンバーに再割り当てするには、リバランスを実行します。
group.id
はコンシューマ グループ ID の設定を担当し、一意である必要があります (変更しないでください)。 Kafka はこれを使用してアプリケーションを認識し、そのオフセットを保存します。
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Librdkafka 構成リファレンス
librdkafka は、デフォルトで、消費されたパーティションごとに最大 1GB のメッセージをバッファーします。コンシューマのqueued.max.messages.kbytes
パラメータの値を減らすことで、メモリ使用量を減らすことができます。
各コンシューマーおよびプロデューサーのインスタンスは、 topic.metadata.refresh.interval.ms
パラメーターで定義された間隔でトピックのメタデータをフェッチします。 librdkafka のバージョンに応じて、パラメータのデフォルトは 10 秒または 600 秒になります。
librdkafka は、デフォルトでクラスターのすべてのトピックのメタデータを取得します。 topic.metadata.refresh.sparse
文字列"true"
に設定すると、librdkafka は使用するトピックのみをフェッチします。
topic.metadata.refresh.sparse
を"true"
に設定し、 topic.metadata.refresh.interval.ms
を 600 秒 (および若干のジッター) に設定すると、コンシューマとトピックの数に応じて、帯域幅が大幅に削減される可能性があります。
この設定により、librdkafka の処理が完了するとすぐに librdkafka スレッドを終了できます。これにより、PHP プロセス/リクエストを効果的に迅速に終了できるようになります。
これを有効にする場合は、次のように信号をマスクする必要があります。
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
ブローカーソケット操作がブロックされる最大時間。値を低くすると、CPU 使用率が若干高くなりますが、応答性は向上します。
この設定の値を小さくすると、シャットダウン速度が向上します。この値は、読み取りループの 1 回の反復で librdkafka がブロックする最大時間を定義します。これは、メインの librdkafka スレッドが終了を確認する頻度も定義します。
これは、メッセージのバッチを送信する前に librdkafka が待機する最大時間とデフォルト時間を定義します。この設定をたとえば 1ms に減らすと、メッセージがバッチ化されるのではなく、できるだけ早く送信されるようになります。
これにより、rdkafka インスタンスと PHP プロセス/リクエストのシャットダウン時間が短縮されることが確認されています。
ここでは、低遅延のために最適化された構成を示します。これにより、PHP プロセス/リクエストはできるだけ早くメッセージを送信し、すぐに終了することができます。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
コールバックを処理するには、定期的にポーリングを呼び出すことをお勧めします。 php-rdkafka:3.x
の場合
ポーリングはシャットダウン中にも呼び出されたため、定期的に呼び出さないと、
シャットダウンに少し時間がかかります。以下の例では、キューにイベントがなくなるまでポーリングします。
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
ドキュメントのソースはここにあります
ドキュメントが十分でない場合は、Gitter の php-rdkafka チャネルまたは Google グループでお気軽に質問してください。
IDE は php-rdkadka API を自動検出できないため、php-rdkafka クラス、関数、定数のスタブのセットを提供する外部パッケージの使用を検討できます: kwn/php-rdkafka-stubs
貢献したい場合は、よろしくお願いします:)
始める前に、CONTRIBUTING ドキュメントを参照して、変更をマージする方法を確認してください。
ドキュメントは librdkafka からコピーされました。
著者: 寄稿者を参照。
php-rdkafka は MIT ライセンスに基づいてリリースされています。