EN RAISON DE PROBLEMES AVEC LES VERSIONS, IL EST SUGGÉRÉ D'UTILISER https://github.com/wbarnha/kafka-python-ng POUR LE MOMENT
Client Python pour le système de traitement de flux distribué Apache Kafka. kafka-python est conçu pour fonctionner un peu comme le client Java officiel, avec une pincée d'interfaces pythoniques (par exemple, des itérateurs consommateurs).
kafka-python est mieux utilisé avec les courtiers les plus récents (0.9+), mais est rétrocompatible avec les anciennes versions (jusqu'à 0.8.0). Certaines fonctionnalités ne seront activées que sur les courtiers les plus récents. Par exemple, des groupes de consommateurs entièrement coordonnés (c'est-à-dire l'attribution dynamique de partitions à plusieurs consommateurs dans le même groupe) nécessitent l'utilisation de courtiers kafka 0.9+. La prise en charge de cette fonctionnalité pour les versions antérieures du courtier nécessiterait l'écriture et la maintenance d'un code personnalisé d'élection de direction et d'adhésion/bilan de santé (peut-être en utilisant zookeeper ou consul). Pour les courtiers plus anciens, vous pouvez obtenir quelque chose de similaire en attribuant manuellement différentes partitions à chaque instance consommateur avec des outils de gestion de configuration tels que chef, ansible, etc. Cette approche fonctionnera bien, même si elle ne prend pas en charge le rééquilibrage en cas d'échec. Voir <https://kafka-python.readthedocs.io/en/master/compatibility.html> pour plus de détails.
Veuillez noter que la branche master peut contenir des fonctionnalités inédites. Pour la documentation de la version, veuillez consulter readthedocs et/ou l'aide en ligne de Python.
>>> pip install kafka - python
KafkaConsumer est un consommateur de messages de haut niveau, destiné à fonctionner de la manière la plus similaire possible au client Java officiel. La prise en charge complète des groupes de consommateurs coordonnés nécessite l'utilisation de courtiers kafka prenant en charge les API de groupe : kafka v0.9+.
Voir <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> pour plus de détails sur l'API et la configuration.
L'itérateur consommateur renvoie ConsumerRecords, qui sont de simples tuples nommés qui exposent les attributs de message de base : sujet, partition, décalage, clé et valeur :
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer est un producteur de messages asynchrones de haut niveau. La classe est destinée à fonctionner de la manière la plus similaire possible au client Java officiel. Voir <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> pour plus de détails.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
Le KafkaProducer peut être utilisé sur plusieurs threads sans problème, contrairement au KafkaConsumer qui ne le peut pas.
Bien qu'il soit possible d'utiliser KafkaConsumer de manière locale, le multitraitement est recommandé.
kafka-python prend en charge les formats de compression suivants :
gzip est supporté nativement, les autres nécessitent l'installation de bibliothèques supplémentaires. Voir <https://kafka-python.readthedocs.io/en/master/install.html> pour plus d'informations.
Kafka utilise les sommes de contrôle CRC32 pour valider les messages. kafka-python inclut une implémentation Python pure pour des raisons de compatibilité. Pour améliorer les performances des applications à haut débit, kafka-python utilisera crc32c pour le code natif optimisé s'il est installé. Voir <https://kafka-python.readthedocs.io/en/master/install.html> pour les instructions d'installation. Voir https://pypi.org/project/crc32c/ pour plus de détails sur la bibliothèque crc32c sous-jacente.
Un objectif secondaire de kafka-python est de fournir une couche de protocole facile à utiliser pour interagir avec les courtiers kafka via le repl python. Ceci est utile pour les tests, les sondages et l’expérimentation générale. La prise en charge du protocole est exploitée pour activer une méthode KafkaClient.check_version() qui sonde un courtier Kafka et tente d'identifier la version qu'il exécute (0.8.0 à 2.6+).