ИЗ-ЗА ПРОБЛЕМ С РЕЛИЗАМИ, ВРЕМЯ ПРЕДЛАГАЕТСЯ ИСПОЛЬЗОВАТЬ https://github.com/wbarnha/kafka-python-ng.
Клиент Python для системы распределенной потоковой обработки Apache Kafka. kafka-python спроектирован так, чтобы функционировать так же, как официальный Java-клиент, с добавлением питонических интерфейсов (например, потребительских итераторов).
kafka-python лучше всего использовать с новыми брокерами (0.9+), но он обратно совместим со старыми версиями (до 0.8.0). Некоторые функции будут доступны только у новых брокеров. Например, полностью скоординированные группы потребителей, то есть динамическое назначение разделов нескольким потребителям в одной группе, требуют использования брокеров Kafka версии 0,9+. Поддержка этой функции в более ранних версиях брокера потребует написания и поддержки специального кода выборов руководства и проверки членства/здоровья (возможно, с использованием Zookeeper или Consul). Для старых брокеров вы можете добиться чего-то подобного, вручную назначая разные разделы каждому экземпляру потребителя с помощью инструментов управления конфигурациями, таких как Chef, Ansible и т. д. Этот подход будет работать нормально, хотя он не поддерживает перебалансировку в случае сбоев. Дополнительную информацию см. в <https://kafka-python.readthedocs.io/en/master/compatibility.html>.
Обратите внимание, что основная ветка может содержать невыпущенные функции. Документацию по выпуску см. в readthedocs и/или встроенной справке Python.
>>> pip install kafka - python
KafkaConsumer — это потребитель сообщений высокого уровня, работа которого максимально аналогична официальному Java-клиенту. Полная поддержка скоординированных групп потребителей требует использования брокеров Kafka, поддерживающих групповые API: kafka v0.9+.
См. <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> для получения подробной информации об API и конфигурации.
Итератор потребителя возвращает ConsumerRecords, которые представляют собой простые именованные кортежи, предоставляющие основные атрибуты сообщения: тему, раздел, смещение, ключ и значение:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer — это асинхронный производитель сообщений высокого уровня. Этот класс должен работать максимально похоже на официальный Java-клиент. Дополнительную информацию см. в <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer можно без проблем использовать в разных потоках, в отличие от KafkaConsumer, который не может.
Хотя KafkaConsumer можно использовать локально в потоке, рекомендуется многопроцессорная обработка.
kafka-python поддерживает следующие форматы сжатия:
gzip поддерживается изначально, остальные требуют установки дополнительных библиотек. Дополнительную информацию см. в <https://kafka-python.readthedocs.io/en/master/install.html>.
Kafka использует контрольные суммы CRC32 для проверки сообщений. kafka-python включает в себя реализацию на чистом Python для совместимости. Чтобы повысить производительность приложений с высокой пропускной способностью, kafka-python будет использовать crc32c для оптимизации собственного кода, если он установлен. Инструкции по установке см. в <https://kafka-python.readthedocs.io/en/master/install.html>. См. https://pypi.org/project/crc32c/ для получения подробной информации о базовой библиотеке crc32c.
Вторичная цель kafka-python — предоставить простой в использовании уровень протокола для взаимодействия с брокерами Kafka через repl Python. Это полезно для тестирования, проб и общих экспериментов. Поддержка протокола используется для включения метода KafkaClient.check_version(), который проверяет брокера Kafka и пытается определить, какую версию он использует (от 0.8.0 до 2.6+).