PHP-rdkafka는 librdkafka를 기반으로 하는 안정적 이고 생산 준비가 되어 있으며 빠른 PHP용 Kafka 클라이언트입니다.
현재 버전은 PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8을 지원합니다. 버전 6.x는 PHP 7.x..8.x, librdkafka 0.11..2.x를 지원합니다. 이전 버전은 PHP 5를 지원합니다.
확장의 목표는 생산 및 장기 지원에 초점을 맞춘 낮은 수준의 의견이 없는 librdkafka 바인딩이 되는 것입니다.
높은 수준과 낮은 수준의 Consumer , producer 및 메타데이터 API가 지원됩니다.
문서는 여기에서 확인할 수 있습니다.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
아래에 사용된 구성 매개변수는 Librdkafka 구성 참조에서 찾을 수 있습니다.
생산을 위해서는 먼저 생산자를 생성하고 여기에 브로커(Kafka 서버)를 추가해야 합니다.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
경고 메시지가 손실되지 않도록 생산자가 적절한 종료(아래 참조)를 따르도록 하십시오.
다음으로 생산자로부터 주제 인스턴스를 생성합니다.
<?php
$ topic = $ rk -> newTopic ( " test " );
거기서부터 생성 메소드를 사용하여 원하는 만큼의 메시지를 생성할 수 있습니다.
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
첫 번째 인수는 파티션입니다. RD_KAFKA_PARTITION_UA는 할당되지 않음 을 나타내며 librdkafka가 파티션을 선택할 수 있도록 합니다.
두 번째 인수는 메시지 플래그이며 0이어야 합니다.
또는 RD_KAFKA_MSG_F_BLOCK
하여 전체 대기열에서 생성을 차단합니다. 메시지 페이로드는 무엇이든 될 수 있습니다.
이 작업은 생산자 인스턴스를 삭제하기 전에 수행되어야 합니다.
대기 중인 생산 요청과 기내 생산 요청이 모두 완료되었는지 확인하기 위해
종료하기 전에. $timeout_ms
에 합리적인 값을 사용하십시오.
경고 플러시를 호출하지 않으면 메시지가 손실될 수 있습니다!
$ rk -> flush ( $ timeout_ms );
아직 전송되지 않은 메시지를 보내는 것에 신경 쓰지 않는 경우, flush()
호출하기 전에 purge()
사용할 수 있습니다.
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
RdKafkaKafkaConsumer 클래스는 자동 파티션 할당/해지를 지원합니다. 여기의 예를 참조하세요.
참고 낮은 수준 소비자는 레거시 API이므로 높은 수준 소비자를 사용하는 것이 좋습니다.
먼저 낮은 수준의 소비자를 생성하고 여기에 브로커(Kafka 서버)를 추가해야 합니다.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
다음으로 newTopic()
메서드를 호출하여 주제 인스턴스를 생성하고 파티션 0에서 사용을 시작합니다.
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
다음으로 소비된 메시지를 검색합니다.
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
참고 낮은 수준 소비자는 레거시 API이므로 높은 수준 소비자를 사용하는 것이 좋습니다.
여러 주제 및/또는 파티션에서 사용하려면 librdkafka에 이러한 주제/파티션의 모든 메시지를 내부 대기열로 전달한 다음 이 대기열에서 사용하도록 지시하면 됩니다.
대기열 만들기:
<?php
$ queue = $ rk -> newQueue ();
대기열에 주제 파티션 추가:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
다음으로 큐에서 소비된 메시지를 검색합니다.
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
기본적으로 librdkafka는 브로커에 오프셋을 저장합니다.
오프셋 저장을 위해 로컬 파일을 사용하는 경우 기본적으로 파일은 주제와 파티션을 기반으로 한 이름으로 현재 디렉터리에 생성됩니다. offset.store.path
구성 속성을 설정하여 디렉터리를 변경할 수 있습니다.
오프셋을 수동으로 제어하려면 enable.auto.offset.store
false
로 설정하세요.
auto.commit.interval.ms
및 auto.commit.enable
설정이 제어합니다.
저장된 오프셋이 브로커에 자동으로 커밋되는지 여부와 간격입니다.
오프셋을 수동으로 제어하려면 enable.auto.commit
false
로 설정하세요.
상위 수준 소비자에 대한 메시지 소비 호출 사이에 허용되는 최대 시간입니다.
이 간격을 초과하면 소비자는 실패한 것으로 간주되고 그룹은
다른 소비자 그룹 구성원에게 파티션을 재할당하기 위해 재조정합니다.
group.id
소비자 그룹 ID 설정을 담당하며 고유해야 하며 변경되어서는 안 됩니다. Kafka는 이를 사용하여 애플리케이션을 인식하고 그에 대한 오프셋을 저장합니다.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Librdkafka 구성 참조
librdkafka는 기본적으로 소비된 각 파티션에 대해 최대 1GB의 메시지를 버퍼링합니다. 소비자의 queued.max.messages.kbytes
매개변수 값을 줄여 메모리 사용량을 낮출 수 있습니다.
각 소비자 및 생산자 인스턴스는 topic.metadata.refresh.interval.ms
매개 변수에 정의된 간격으로 주제 메타데이터를 가져옵니다. librdkafka 버전에 따라 매개변수의 기본값은 10초 또는 600초입니다.
librdkafka는 기본적으로 클러스터의 모든 주제에 대한 메타데이터를 가져옵니다. topic.metadata.refresh.sparse
"true"
문자열로 설정하면 librdkafka가 자신이 사용하는 주제만 가져옵니다.
topic.metadata.refresh.sparse
"true"
로 설정하고 topic.metadata.refresh.interval.ms
600초(일부 지터 포함)로 설정하면 소비자 및 주제 수에 따라 대역폭을 크게 줄일 수 있습니다.
이 설정을 사용하면 librdkafka 스레드가 완료되는 즉시 librdkafka 스레드가 종료됩니다. 이를 통해 PHP 프로세스/요청을 효과적으로 신속하게 종료할 수 있습니다.
이를 활성화할 때 다음과 같이 신호를 마스크해야 합니다.
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
브로커 소켓 작업이 차단될 수 있는 최대 시간입니다. 값이 낮을수록 CPU 사용량이 약간 높아지는 대신 응답성이 향상됩니다.
이 설정 값을 줄이면 종료 속도가 향상됩니다. 이 값은 librdkafka가 읽기 루프의 한 번의 반복에서 차단하는 최대 시간을 정의합니다. 이는 또한 기본 librdkafka 스레드가 종료를 확인하는 빈도를 정의합니다.
이는 librdkafka가 메시지 배치를 보내기 전에 기다리는 최대 및 기본 시간을 정의합니다. 이 설정을 예를 들어 1ms로 줄이면 메시지가 일괄 처리되는 대신 최대한 빨리 전송됩니다.
이는 rdkafka 인스턴스와 PHP 프로세스/요청의 종료 시간을 줄이는 것으로 나타났습니다.
낮은 지연 시간에 최적화된 구성은 다음과 같습니다. 이를 통해 PHP 프로세스/요청이 최대한 빨리 메시지를 보내고 빠르게 종료될 수 있습니다.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
콜백을 제공하려면 정기적으로 폴링을 호출하는 것이 좋습니다. php-rdkafka:3.x
에서
폴링은 종료 중에도 호출되었으므로 정기적으로 호출하지 않으면
종료 시간이 약간 길어집니다. 아래 예에서는 대기열에 더 이상 이벤트가 없을 때까지 폴링합니다.
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
문서의 출처는 여기에서 찾을 수 있습니다
문서가 충분하지 않은 경우 Gitter 또는 Google 그룹스의 php-rdkafka 채널에 자유롭게 질문하세요.
IDE가 php-rdkadka API를 자동으로 검색할 수 없기 때문에 php-rdkafka 클래스, 함수 및 상수에 대한 스텁 세트를 제공하는 외부 패키지 사용을 고려할 수 있습니다: kwn/php-rdkafka-stubs
기여하고 싶다면 감사합니다 :)
시작하기 전에 CONTRIBUTING 문서를 살펴보고 변경 사항을 병합하는 방법을 확인하세요.
librdkafka에서 복사된 문서입니다.
저자: 기여자를 참조하세요.
php-rdkafka는 MIT 라이센스로 배포됩니다.