PHP-rdkafka — это стабильный , готовый к использованию и быстрый клиент Kafka для PHP, основанный на librdkafka.
Текущая версия поддерживает PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. Версия 6.x поддерживает PHP 7.x..8.x, librdkafka 0.11..2.x. Старые версии поддерживают PHP 5.
Цель расширения — стать низкоуровневой независимой привязкой librdkafka, ориентированной на производство и долгосрочную поддержку.
Поддерживаются API-интерфейсы потребителей высокого и низкого уровня, производителя и метаданных .
Документация доступна здесь.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Параметры конфигурации, используемые ниже, можно найти в справочнике по конфигурации Librdkafka.
Для производства нам сначала нужно создать производителя и добавить к нему брокеров (серверы Kafka):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Предупреждение. Убедитесь, что ваш производитель выполнил правильное завершение работы (см. ниже), чтобы не потерять сообщения.
Далее мы создаем экземпляр темы от производителя:
<?php
$ topic = $ rk -> newTopic ( " test " );
Отсюда мы можем создавать столько сообщений, сколько захотим, используя метод производства:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
Первый аргумент — это раздел. RD_KAFKA_PARTITION_UA означает unassigned и позволяет librdkafka выбрать раздел.
Второй аргумент — это флаги сообщений и должен быть либо 0, либо 0.
или RD_KAFKA_MSG_F_BLOCK
, чтобы заблокировать производство при полной очереди. Полезная нагрузка сообщения может быть любой.
Это следует сделать до уничтожения экземпляра производителя.
чтобы убедиться, что все запросы на производство, находящиеся в очереди и в процессе выполнения, выполнены.
перед прекращением. Используйте разумное значение для $timeout_ms
.
Предупреждение. Невыполнение сброса может привести к потере сообщения!
$ rk -> flush ( $ timeout_ms );
Если вас не волнует отправка еще не отправленных сообщений, вы можете использовать purge()
перед flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
Класс RdKafkaKafkaConsumer поддерживает автоматическое назначение/отзыв разделов. См. пример здесь.
Примечание. Низкоуровневый потребитель — это устаревший API, поэтому лучше использовать высокоуровневый потребитель.
Сначала нам нужно создать потребителя низкого уровня и добавить к нему брокеров (серверы Kafka):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
Затем создайте экземпляр темы, вызвав метод newTopic()
, и начните использовать раздел 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
Затем извлеките использованные сообщения:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Примечание. Низкоуровневый потребитель — это устаревший API, поэтому лучше использовать высокоуровневый потребитель.
Потребление из нескольких тем и/или разделов можно выполнить, указав librdkafka пересылать все сообщения из этих тем/разделов во внутреннюю очередь, а затем потреблять из этой очереди:
Создание очереди:
<?php
$ queue = $ rk -> newQueue ();
Добавление разделов тем в очередь:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
Затем извлеките использованные сообщения из очереди:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka по умолчанию хранит смещения брокера.
Если вы используете локальный файл для смещенного хранения, то по умолчанию файл создается в текущем каталоге с именем, основанным на теме и разделе. Каталог можно изменить, установив свойство конфигурации offset.store.path
.
Чтобы вручную управлять смещением, установите для параметра enable.auto.offset.store
значение false
.
Настройки auto.commit.interval.ms
и auto.commit.enable
будут контролировать
будут ли сохраненные смещения автоматически переданы брокеру и в каком интервале.
Чтобы вручную управлять смещением, установите для enable.auto.commit
значение false
.
Максимально допустимое время между вызовами для получения сообщений для потребителей высокого уровня.
Если этот интервал превышен, потребитель считается вышедшим из строя и группа будет отключена.
выполнить перебалансировку, чтобы переназначить разделы другому члену группы потребителей.
group.id
отвечает за установку идентификатора вашей группы потребителей, и он должен быть уникальным (и не должен меняться). Kafka использует его для распознавания приложений и сохранения для них смещений.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Справочник по конфигурации Librdkafka
librdkafka по умолчанию буферизует до 1 ГБ сообщений для каждого используемого раздела. Вы можете снизить использование памяти, уменьшив значение параметра queued.max.messages.kbytes
для ваших потребителей.
Каждый экземпляр потребителя и производителя будет получать метаданные тем с интервалом, определяемым параметром topic.metadata.refresh.interval.ms
. В зависимости от вашей версии librdkafka этот параметр по умолчанию равен 10 секундам или 600 секундам.
librdkafka по умолчанию извлекает метаданные для всех тем кластера. Установка для topic.metadata.refresh.sparse
строки "true"
гарантирует, что librdkafka будет получать только те темы, которые он использует.
Установка для topic.metadata.refresh.sparse
значения "true"
и topic.metadata.refresh.interval.ms
значения 600 секунд (плюс некоторое дрожание) может значительно снизить пропускную способность, в зависимости от количества потребителей и тем.
Этот параметр позволяет потокам librdkafka завершать работу, как только librdkafka завершает работу с ними. Это эффективно позволяет вашим PHP-процессам/запросам быстро завершаться.
При включении этого параметра вам необходимо замаскировать сигнал следующим образом:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Максимальное время, на которое может блокироваться операция сокета брокера. Более низкое значение улучшает скорость реагирования за счет несколько более высокой загрузки ЦП.
Уменьшение значения этого параметра увеличивает скорость завершения работы. Значение определяет максимальное время, на которое librdkafka будет блокироваться за одну итерацию цикла чтения. Это также определяет, как часто основной поток librdkafka будет проверять завершение.
Это определяет максимальное время и время по умолчанию, которое librdkafka будет ждать перед отправкой пакета сообщений. Уменьшение этого параметра, например, до 1 мс, гарантирует, что сообщения будут отправляться как можно скорее, а не группироваться.
Было замечено, что это сокращает время завершения работы экземпляра rdkafka и процесса/запроса PHP.
Вот конфигурация, оптимизированная для низкой задержки. Это позволяет процессу/запросу PHP отправлять сообщения как можно скорее и быстро завершаться.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
Рекомендуется вызывать опрос через регулярные промежутки времени для обслуживания обратных вызовов. В php-rdkafka:3.x
опрос также вызывался во время выключения, поэтому невыполнение его через регулярные промежутки времени может
привести к несколько более длительному отключению. В приведенном ниже примере опрос производится до тех пор, пока в очереди не останется событий:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
Источник документации можно найти здесь.
Если документации недостаточно, не стесняйтесь задавать вопросы на каналах php-rdkafka в Gitter или группах Google.
Поскольку ваша IDE не может автоматически обнаруживать API php-rdkafka, вы можете рассмотреть возможность использования внешнего пакета, предоставляющего набор заглушек для классов, функций и констант php-rdkafka: kwn/php-rdkafka-stubs
Если вы хотите внести свой вклад, спасибо :)
Прежде чем начать, пожалуйста, ознакомьтесь с документом CONTRIBUTING, чтобы узнать, как объединить ваши изменения.
Документация скопирована с librdkafka.
Авторы: см. авторов.
php-rdkafka выпускается под лицензией MIT.