kafka php
V0.2.0.8
中文文档
Kafka-php是一个纯PHP的kafka客户端,目前支持大于0.8.x版本的Kafka,本项目v0.2.x和v0.1.x不兼容,如果使用原来的v0.1.x可以参考文档 Kafka PHP v0.1.x 文档,但建议切换到 v0.2.x 。 v0.2.x使用PHP异步实现与kafkabroker交互,比v0.1.x更稳定高效,因为使用PHP语言所以不用编译任何扩展即可使用,减少访问和维护成本
将 lib 目录添加到 PHP include_path 并使用类似于示例目录中的自动加载器(代码遵循 PEAR/Zend 每个文件一个类的约定)。
如果您使用 Composer 来管理项目的依赖项,只需将依赖项nmred/kafka-php
添加到您的项目中即可。
$ composer require nmred/kafka-php
这是composer.json 文件的最小示例:
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
配置属性记录在配置中
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
参考示例
第一组:531522091 第二组:657517955