Aufgrund von Problemen mit Releases wird empfohlen, vorerst https://github.com/wbarnha/kafka-python-ng zu verwenden
Python-Client für das verteilte Stream-Verarbeitungssystem Apache Kafka. kafka-python ist so konzipiert, dass es ähnlich wie der offizielle Java-Client funktioniert, mit einer Prise Python-Schnittstellen (z. B. Consumer-Iteratoren).
kafka-python lässt sich am besten mit neueren Brokern (0.9+) verwenden, ist aber abwärtskompatibel mit älteren Versionen (bis 0.8.0). Einige Funktionen werden nur auf neueren Brokern aktiviert. Beispielsweise erfordern vollständig koordinierte Verbrauchergruppen – d. h. die dynamische Partitionszuweisung an mehrere Verbraucher in derselben Gruppe – die Verwendung von Kafka-Brokern ab Version 0.9. Um diese Funktion für frühere Broker-Versionen zu unterstützen, müssten benutzerdefinierte Codes für Führungswahlen und Mitgliedschafts-/Gesundheitsprüfungen geschrieben und gepflegt werden (möglicherweise mithilfe von zookeeper oder consul). Bei älteren Brokern können Sie etwas Ähnliches erreichen, indem Sie jeder Verbraucherinstanz mit Konfigurationsverwaltungstools wie Chef, Ansible usw. manuell unterschiedliche Partitionen zuweisen. Dieser Ansatz funktioniert einwandfrei, unterstützt jedoch keine Neuverteilung bei Fehlern. Weitere Informationen finden Sie unter <https://kafka-python.readthedocs.io/en/master/compatibility.html>.
Bitte beachten Sie, dass der Master-Zweig möglicherweise unveröffentlichte Funktionen enthält. Die Release-Dokumentation finden Sie in readthedocs und/oder in der Inline-Hilfe von Python.
>>> pip install kafka - python
KafkaConsumer ist ein Nachrichtenkonsument auf hoher Ebene, der möglichst ähnlich wie der offizielle Java-Client funktionieren soll. Für die vollständige Unterstützung koordinierter Verbrauchergruppen ist die Verwendung von Kafka-Brokern erforderlich, die die Gruppen-APIs unterstützen: kafka v0.9+.
API- und Konfigurationsdetails finden Sie unter <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>.
Der Consumer-Iterator gibt ConsumerRecords zurück, bei denen es sich um einfache benannte Tupel handelt, die grundlegende Nachrichtenattribute offenlegen: Thema, Partition, Offset, Schlüssel und Wert:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer ist ein asynchroner Nachrichtenproduzent auf hoher Ebene. Die Klasse soll möglichst ähnlich wie der offizielle Java-Client funktionieren. Weitere Informationen finden Sie unter <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
Der KafkaProducer kann problemlos threadübergreifend verwendet werden, im Gegensatz zum KafkaConsumer, der dies nicht kann.
Während es möglich ist, den KafkaConsumer threadlokal zu verwenden, wird Multiprocessing empfohlen.
kafka-python unterstützt die folgenden Komprimierungsformate:
gzip wird nativ unterstützt, die anderen erfordern die Installation zusätzlicher Bibliotheken. Weitere Informationen finden Sie unter <https://kafka-python.readthedocs.io/en/master/install.html>.
Kafka verwendet CRC32-Prüfsummen, um Nachrichten zu validieren. kafka-python enthält aus Kompatibilitätsgründen eine reine Python-Implementierung. Um die Leistung für Anwendungen mit hohem Durchsatz zu verbessern, verwendet Kafka-Python crc32c für optimierten nativen Code, sofern installiert. Installationsanweisungen finden Sie unter <https://kafka-python.readthedocs.io/en/master/install.html>. Weitere Informationen zur zugrunde liegenden crc32c-Bibliothek finden Sie unter https://pypi.org/project/crc32c/.
Ein sekundäres Ziel von Kafka-Python ist die Bereitstellung einer benutzerfreundlichen Protokollschicht für die Interaktion mit Kafka-Brokern über die Python-Repl. Dies ist nützlich für Tests, Sondierungen und allgemeine Experimente. Die Protokollunterstützung wird genutzt, um eine KafkaClient.check_version()-Methode zu aktivieren, die einen Kafka-Broker prüft und versucht, herauszufinden, welche Version er ausführt (0.8.0 bis 2.6+).