PHP-rdkafka é um cliente Kafka estável , pronto para produção e rápido para PHP baseado em librdkafka.
A versão atual suporta PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. A versão 6.x suporta PHP 7.x..8.x, librdkafka 0.11..2.x. Versões mais antigas suportam PHP 5.
O objetivo da extensão é ser uma ligação librdkafka de baixo nível e sem opinião, focada na produção e no suporte de longo prazo.
As APIs de consumidores , produtores e metadados de alto e baixo nível são suportadas.
A documentação está disponível aqui.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Os parâmetros de configuração usados abaixo podem ser encontrados na referência de configuração do Librdkafka
Para produzir, primeiro precisamos criar um produtor e adicionar corretores (servidores Kafka) a ele:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Aviso Certifique-se de que seu produtor siga o encerramento adequado (veja abaixo) para não perder mensagens.
A seguir, criamos uma instância de tópico do produtor:
<?php
$ topic = $ rk -> newTopic ( " test " );
A partir daí, podemos produzir quantas mensagens quisermos, usando o método produzir:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
O primeiro argumento é a partição. RD_KAFKA_PARTITION_UA significa unassigned e permite que librdkafka escolha a partição.
O segundo argumento são sinalizadores de mensagem e devem ser 0
ou RD_KAFKA_MSG_F_BLOCK
para bloquear a produção na fila completa. A carga útil da mensagem pode ser qualquer coisa.
Isso deve ser feito antes de destruir uma instância do produtor
para garantir que todas as solicitações de produtos na fila e em andamento sejam concluídas
antes de terminar. Use um valor razoável para $timeout_ms
.
Aviso Não ligar para flush pode levar à perda de mensagens!
$ rk -> flush ( $ timeout_ms );
Caso você não se importe em enviar mensagens que ainda não foram enviadas, você pode usar purge()
antes de chamar flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
A classe RdKafkaKafkaConsumer oferece suporte à atribuição/revogação automática de partição. Veja o exemplo aqui.
Nota O consumidor de baixo nível é uma API legada, prefira usar o consumidor de alto nível
Primeiro precisamos criar um consumidor de baixo nível e adicionar corretores (servidores Kafka) a ele:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
A seguir, crie uma instância de tópico chamando o método newTopic()
e comece a consumir na partição 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
Em seguida, recupere as mensagens consumidas:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Nota O consumidor de baixo nível é uma API legada, prefira usar o consumidor de alto nível
O consumo de vários tópicos e/ou partições pode ser feito dizendo ao librdkafka para encaminhar todas as mensagens desses tópicos/partições para uma fila interna e, em seguida, consumir desta fila:
Criando a fila:
<?php
$ queue = $ rk -> newQueue ();
Adicionando partições de tópico à fila:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
Em seguida, recupere as mensagens consumidas da fila:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka por padrão armazena compensações no corretor.
Se você estiver usando um arquivo local para armazenamento de deslocamento, por padrão o arquivo será criado no diretório atual, com um nome baseado no tópico e na partição. O diretório pode ser alterado configurando a propriedade de configuração offset.store.path
.
Para controlar manualmente o deslocamento, defina enable.auto.offset.store
como false
.
As configurações auto.commit.interval.ms
e auto.commit.enable
controlarão
se as compensações armazenadas serão automaticamente confirmadas no corretor e em qual intervalo.
Para controlar manualmente o deslocamento, defina enable.auto.commit
como false
.
Tempo máximo permitido entre chamadas para consumir mensagens para consumidores de alto nível.
Se este intervalo for ultrapassado o consumidor é considerado falho e o grupo será
rebalancear para reatribuir as partições a outro membro do grupo de consumidores.
group.id
é responsável por definir o ID do seu grupo de consumidores e deve ser único (e não deve mudar). Kafka o utiliza para reconhecer aplicativos e armazenar compensações para eles.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Referência de configuração do Librdkafka
librdkafka armazenará em buffer até 1 GB de mensagens para cada partição consumida por padrão. É possível diminuir o uso de memória reduzindo o valor do parâmetro queued.max.messages.kbytes
em seus consumidores.
Cada instância de consumidor e produtor buscará metadados de tópicos em um intervalo definido pelo parâmetro topic.metadata.refresh.interval.ms
. Dependendo da versão do seu librdkafka, o parâmetro padrão é 10 segundos ou 600 segundos.
librdkafka busca os metadados para todos os tópicos do cluster por padrão. Definir topic.metadata.refresh.sparse
como a string "true"
garante que librdkafka busque apenas os tópicos que ele usa.
Definir topic.metadata.refresh.sparse
como "true"
e topic.metadata.refresh.interval.ms
como 600 segundos (mais alguma instabilidade) pode reduzir bastante a largura de banda, dependendo do número de consumidores e tópicos.
Esta configuração permite que os threads do librdkafka terminem assim que o librdkafka terminar com eles. Isso efetivamente permite que seus processos/solicitações PHP sejam encerrados rapidamente.
Ao habilitar isso, você deve mascarar o sinal assim:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Tempo máximo que uma operação de soquete do corretor pode bloquear. Um valor mais baixo melhora a capacidade de resposta às custas de um uso ligeiramente maior da CPU.
Reduzir o valor desta configuração melhora a velocidade de desligamento. O valor define o tempo máximo que o librdkafka bloqueará em uma iteração de um loop de leitura. Isso também define com que frequência o thread principal do librdkafka verificará o encerramento.
Isto define o tempo máximo e padrão que o librdkafka irá esperar antes de enviar um lote de mensagens. Reduzir esta configuração para, por exemplo, 1 ms garante que as mensagens sejam enviadas o mais rápido possível, em vez de serem agrupadas em lote.
Foi observado que isso reduz o tempo de desligamento da instância rdkafka e do processo/solicitação PHP.
Aqui está uma configuração otimizada para baixa latência. Isso permite que um processo/solicitação PHP envie mensagens o mais rápido possível e termine rapidamente.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
É aconselhável chamar poll em intervalos regulares para atender retornos de chamada. Em php-rdkafka:3.x
poll também foi chamada durante o desligamento, portanto, não chamá-la em intervalos regulares pode
levar a um desligamento um pouco mais longo. O exemplo abaixo pesquisa até que não haja mais eventos na fila:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
A fonte da documentação pode ser encontrada aqui
Se a documentação não for suficiente, fique à vontade para fazer perguntas nos canais php-rdkafka no Gitter ou nos Grupos do Google.
Como seu IDE não é capaz de descobrir automaticamente a API php-rdkadka, você pode considerar o uso de um pacote externo que fornece um conjunto de stubs para classes, funções e constantes php-rdkafka: kwn/php-rdkafka-stubs
Se quiser contribuir, obrigado :)
Antes de começar, dê uma olhada no documento CONTRIBUTING para ver como mesclar suas alterações.
Documentação copiada de librdkafka.
Autores: veja colaboradores.
php-rdkafka é lançado sob a licença do MIT.