PHP-rdkafka 是一个基于 librdkafka 的稳定、可用于生产且快速的PHP Kafka 客户端。
当前版本支持 PHP >= 8.1.0、librdkafka >= 1.5.3、Kafka >= 0.8。版本 6.x 支持 PHP 7.x..8.x、librdkafka 0.11..2.x。旧版本支持 PHP 5。
扩展的目标是成为一个专注于生产和长期支持的低级非固定 librdkafka 绑定。
支持高级和低级消费者、生产者和元数据API。
文档可在此处获取。
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
下面使用的配置参数可以在Librdkafka配置参考中找到
对于生产,我们首先需要创建一个生产者,并向其中添加代理(Kafka 服务器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
警告确保您的生产者遵循正确的关闭(见下文)以免丢失消息。
接下来,我们从生产者创建一个主题实例:
<?php
$ topic = $ rk -> newTopic ( " test " );
从那里,我们可以使用 Produce 方法生成任意数量的消息:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
第一个参数是分区。 RD_KAFKA_PARTITION_UA 代表unsigned ,让 librdkafka 选择分区。
第二个参数是消息标志,应该是 0
或RD_KAFKA_MSG_F_BLOCK
以阻止满队列上的生产。消息有效负载可以是任何内容。
这应该在销毁生产者实例之前完成
确保所有排队和正在进行的生产请求均已完成
在终止之前。为$timeout_ms
使用合理的值。
警告不调用flush可能会导致消息丢失!
$ rk -> flush ( $ timeout_ms );
如果您不关心发送尚未发送的消息,可以在调用flush()
purge()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
RdKafkaKafkaConsumer 类支持自动分区分配/撤销。请参阅此处的示例。
注意低级别消费者是旧版 API,请优先使用高级消费者
我们首先需要创建一个低级消费者,并向其中添加代理(Kafka 服务器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
接下来,通过调用newTopic()
方法创建一个主题实例,并开始在分区 0 上消费:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
接下来,检索消费的消息:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
注意低级别消费者是旧版 API,请优先使用高级消费者
从多个主题和/或分区消费可以通过告诉 librdkafka 将所有消息从这些主题/分区转发到内部队列,然后从该队列消费来完成:
创建队列:
<?php
$ queue = $ rk -> newQueue ();
将主题分区添加到队列中:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
接下来,从队列中检索已消费的消息:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka 默认在代理上存储偏移量。
如果您使用本地文件进行偏移存储,则默认情况下该文件会在当前目录中创建,其名称基于主题和分区。可以通过设置offset.store.path
配置属性来更改目录。
要手动控制偏移量,请将enable.auto.offset.store
设置为false
。
设置auto.commit.interval.ms
和auto.commit.enable
将控制
存储的偏移量是否将自动提交给代理以及在哪个时间间隔。
要手动控制偏移量,请将enable.auto.commit
设置为false
。
为高级消费者消费消息的调用之间允许的最大时间。
如果超过此间隔,则消费者被视为失败,并且该组将
重新平衡,以便将分区重新分配给另一个消费者组成员。
group.id
负责设置您的消费者组 ID,它应该是唯一的(并且不应更改)。 Kafka 使用它来识别应用程序并为其存储偏移量。
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Librdkafka 配置参考
默认情况下,librdkafka 将为每个使用的分区缓冲最多 1GB 的消息。您可以通过减少使用者上的queued.max.messages.kbytes
参数的值来降低内存使用量。
每个消费者和生产者实例将按照topic.metadata.refresh.interval.ms
参数定义的时间间隔获取主题元数据。根据您的 librdkafka 版本,该参数默认为 10 秒或 600 秒。
librdkafka 默认获取集群所有主题的元数据。将topic.metadata.refresh.sparse
设置为字符串"true"
可确保 librdkafka 仅获取他使用的主题。
将topic.metadata.refresh.sparse
设置为"true"
,并将topic.metadata.refresh.interval.ms
设置为 600 秒(加上一些抖动)可以大量减少带宽,具体取决于消费者和主题的数量。
此设置允许 librdkafka 线程在 librdkafka 处理完毕后立即终止。这有效地允许您的 PHP 进程/请求快速终止。
启用此功能时,您必须像这样屏蔽信号:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
代理套接字操作可能阻塞的最长时间。较低的值可提高响应能力,但代价是 CPU 使用率稍高。
减小该设置的值可提高关闭速度。该值定义 librdkafka 在读循环的一次迭代中阻塞的最长时间。这也定义了 librdkafka 主线程检查终止的频率。
这定义了 librdkafka 在发送一批消息之前等待的最大时间和默认时间。将此设置减少到例如 1ms 可确保消息尽快发送,而不是批量发送。
这已被证明可以减少 rdkafka 实例以及 PHP 进程/请求的关闭时间。
这是针对低延迟进行优化的配置。这允许 PHP 进程/请求尽快发送消息并快速终止。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
建议定期调用 poll 来提供回调服务。在php-rdkafka:3.x
中
poll 在关闭期间也会被调用,因此不定期调用它可能会
导致停机时间稍长一些。下面的示例进行轮询,直到队列中不再有事件为止:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
文档的来源可以在这里找到
如果文档还不够,请随时在 Gitter 或 Google Groups 上的 php-rdkafka 频道上提问。
因为您的 IDE 无法自动发现 php-rdkadka api,您可以考虑使用外部包,为 php-rdkafka 类、函数和常量提供一组存根:kwn/php-rdkafka-stubs
如果您愿意贡献,谢谢:)
在开始之前,请查看贡献文档,了解如何合并您的更改。
从 librdkafka 复制的文档。
作者:参见贡献者。
php-rdkafka 在 MIT 许可证下发布。