PHP-rdkafka 是一個基於 librdkafka 的穩定、可用於生產且快速的PHP Kafka 用戶端。
目前版本支援 PHP >= 8.1.0、librdkafka >= 1.5.3、Kafka >= 0.8。版本 6.x 支援 PHP 7.x..8.x、librdkafka 0.11..2.x。舊版支援 PHP 5。
擴展的目標是成為一個專注於生產和長期支援的低階非固定 librdkafka 綁定。
支援高級和低階消費者、生產者和元資料API。
文件可在此處取得。
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
下面使用的設定參數可以在Librdkafka配置參考中找到
對於生產,我們首先需要創建一個生產者,並在其中添加代理(Kafka 伺服器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
警告確保您的生產者遵循正確的關閉(見下文)以免丟失訊息。
接下來,我們從生產者建立一個主題實例:
<?php
$ topic = $ rk -> newTopic ( " test " );
從那裡,我們可以使用 Produce 方法產生任意數量的消息:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
第一個參數是分區。 RD_KAFKA_PARTITION_UA 代表unsigned ,讓 librdkafka 選擇分割區。
第二個參數是訊息標誌,應該是 0
或RD_KAFKA_MSG_F_BLOCK
以阻止滿隊列上的生產。訊息有效負載可以是任何內容。
這應該在銷毀生產者實例之前完成
確保所有排隊和進行中的產品請求均已完成
在終止之前。為$timeout_ms
使用合理的值。
警告不呼叫flush可能會導致訊息遺失!
$ rk -> flush ( $ timeout_ms );
如果您不關心發送尚未發送的訊息,可以在呼叫flush()
purge()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
RdKafkaKafkaConsumer 類別支援自動分區分配/撤銷。請參閱此處的範例。
注意低階消費者是舊版 API,請優先使用高階消費者
我們首先需要創建一個低階的消費者,並在其中加入代理(Kafka 伺服器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
接下來,透過呼叫newTopic()
方法來建立主題實例,並開始在分區 0 上消費:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
接下來,檢索消費的訊息:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
注意低階消費者是舊版 API,請優先使用高階消費者
從多個主題和/或分區消費可以透過告訴 librdkafka 將所有訊息從這些主題/分區轉發到內部佇列,然後從該佇列消費來完成:
建立隊列:
<?php
$ queue = $ rk -> newQueue ();
將主題分區加入佇列:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
接下來,從佇列中檢索已消費的訊息:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka 預設在代理程式上儲存偏移量。
如果您使用本機檔案進行偏移存儲,則預設情況下該檔案會在目前目錄中創建,其名稱基於主題和分割區。可以透過設定offset.store.path
配置屬性來變更目錄。
若要手動控制偏移量,請將enable.auto.offset.store
設定為false
。
設定auto.commit.interval.ms
和auto.commit.enable
將控制
儲存的偏移量是否將自動提交給代理以及在哪個時間間隔。
若要手動控制偏移量,請將enable.auto.commit
設定為false
。
為高階消費者消費訊息的呼叫之間允許的最大時間。
如果超過此間隔,則消費者被視為失敗,並且該組將
重新平衡,以便將分區重新分配給另一個消費者群組成員。
group.id
負責設定您的消費者群組 ID,它應該是唯一的(並且不應更改)。 Kafka 使用它來識別應用程式並為其儲存偏移量。
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Librdkafka 配置參考
預設情況下,librdkafka 將為每個使用的分割區緩衝最多 1GB 的訊息。您可以透過減少使用者的queued.max.messages.kbytes
參數的值來降低記憶體使用量。
每個消費者和生產者實例將依照topic.metadata.refresh.interval.ms
參數定義的時間間隔取得主題元資料。根據您的 librdkafka 版本,該參數預設為 10 秒或 600 秒。
librdkafka 預設會取得叢集所有主題的元資料。將topic.metadata.refresh.sparse
設為字串"true"
可確保 librdkafka 僅取得他使用的主題。
將topic.metadata.refresh.sparse
設為"true"
,並將topic.metadata.refresh.interval.ms
設為 600 秒(加上一些抖動)可以大量減少頻寬,具體取決於消費者和主題的數量。
此設定允許 librdkafka 執行緒在 librdkafka 處理完畢後立即終止。這有效地允許您的 PHP 進程/請求快速終止。
啟用此功能時,您必須像這樣屏蔽訊號:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
代理套接字操作可能阻塞的最長時間。較低的值可以提高反應能力,但代價是 CPU 使用率稍高。
減小該設定的值可提高關閉速度。該值定義 librdkafka 在讀取循環的一次迭代中阻塞的最長時間。這也定義了 librdkafka 主執行緒檢查終止的頻率。
這定義了 librdkafka 在發送一批訊息之前等待的最大時間和預設時間。將此設定減少到例如 1ms 可確保訊息盡快發送,而不是大量發送。
這已被證明可以減少 rdkafka 實例以及 PHP 進程/請求的關閉時間。
這是針對低延遲進行最佳化的配置。這允許 PHP 進程/請求盡快發送訊息並快速終止。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
建議定期呼叫 poll 來提供回呼服務。在php-rdkafka:3.x
中
poll 在關閉期間也會被調用,因此不定期調用它可能會
導致停機時間稍長。下面的範例進行輪詢,直到佇列中不再有事件為止:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
文件的來源可以在這裡找到
如果文件還不夠,請隨時在 Gitter 或 Google Groups 上的 php-rdkafka 頻道上提問。
因為您的 IDE 無法自動發現 php-rdkadka api,您可以考慮使用外部套件,為 php-rdkafka 類別、函數和常數提供一組存根:kwn/php-rdkafka-stubs
如果您願意貢獻,謝謝:)
在開始之前,請查看貢獻文檔,以了解如何合併您的變更。
從 librdkafka 複製的文檔。
作者:參見貢獻者。
php-rdkafka 在 MIT 許可證下發布。