中文文档
Kafka-php est un client kafka PHP pur qui prend actuellement en charge la version supérieure à 0.8.x de Kafka, ce projet v0.2.x et v0.1.x sont incompatibles si vous utilisez la v0.1.x d'origine. Vous pouvez vous référer au document Kafka PHP v0.1.x Document, mais il est recommandé de passer à v0.2.x . La v0.2.x utilise l'implémentation asynchrone PHP et l'interaction du courtier Kafka, plus stable que la v0.1.x efficace, car l'utilisation du langage PHP ne compile aucune extension peut être utilisée pour réduire les coûts d'accès et de maintenance.
Ajoutez le répertoire lib au chemin d'inclusion PHP et utilisez un chargeur automatique comme celui du répertoire d'exemples (le code suit la convention PEAR/Zend d'une classe par fichier).
Ajoutez simplement une dépendance nmred/kafka-php
à votre projet si vous utilisez Composer pour gérer les dépendances de votre projet.
$ composer require nmred/kafka-php
Voici un exemple minimal de fichier composer.json :
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
Les propriétés de configuration sont documentées dans Configuration
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
Référez-vous à l'exemple
Groupe 1 : 531522091 Groupe 2 : 657517955