中文文档
Kafka-php ist ein reiner PHP-Kafka-Client, der derzeit eine höhere Version von Kafka als 0.8.x unterstützt. Dieses Projekt v0.2.x und v0.1.x ist nicht kompatibel, wenn die ursprüngliche Version v0.1.x verwendet wird. Sie können sich auf die beziehen Dokument Kafka PHP v0.1.x Dokument, es wird jedoch empfohlen, zu v0.2.x zu wechseln. v0.2.x verwendet die asynchrone PHP-Implementierung und die Kafka-Broker-Interaktion, was stabiler als v0.1.x ist und effizienter ist, da die PHP-Sprache verwendet werden muss, um keine Erweiterungen zu kompilieren, um die Zugriffs- und Wartungskosten zu senken
Fügen Sie das lib-Verzeichnis zum PHP-include_path hinzu und verwenden Sie einen Autoloader wie den im Beispielverzeichnis (der Code folgt der PEAR/Zend-Konvention „Eine Klasse pro Datei“).
Fügen Sie Ihrem Projekt einfach eine Abhängigkeit nmred/kafka-php
hinzu, wenn Sie Composer zum Verwalten der Abhängigkeiten Ihres Projekts verwenden.
$ composer require nmred/kafka-php
Hier ist ein minimales Beispiel einer Composer.json-Datei:
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
Konfigurationseigenschaften sind in Konfiguration dokumentiert
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
Siehe Beispiel
Gruppe 1: 531522091 Gruppe 2: 657517955