kafka php
V0.2.0.8
中文文檔
Kafka-php是純PHP的kafka客戶端,目前支援大於0.8.x版本的Kafka,本專案v0.2.x和v0.1.x不相容,如果使用原來的v0.1.x可以參考文檔Kafka PHP v0.1.x 文檔,但建議切換到v0.2.x 。 v0.2.x使用PHP非同步實現與kafkabroker交互,比v0.1.x更穩定高效,因為使用PHP語言所以不用編譯任何擴展即可使用,減少訪問和維護成本
將 lib 目錄新增至 PHP include_path 並使用類似範例目錄中的自動載入器(程式碼遵循 PEAR/Zend 每個檔案一個類別的約定)。
如果您使用 Composer 來管理專案的依賴項,只需將依賴項nmred/kafka-php
新增至您的專案中即可。
$ composer require nmred/kafka-php
這是composer.json 檔案的最小範例:
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
配置屬性記錄在配置中
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
參考範例
第一組:531522091 第二組:657517955