DEVIDO A PROBLEMAS COM LANÇAMENTOS, SUGERE-SE USAR https://github.com/wbarnha/kafka-python-ng POR MOMENTO
Cliente Python para o sistema de processamento de fluxo distribuído Apache Kafka. kafka-python foi projetado para funcionar de forma muito semelhante ao cliente java oficial, com uma pitada de interfaces pythônicas (por exemplo, iteradores de consumo).
kafka-python é melhor usado com corretores mais recentes (0.9+), mas é compatível com versões anteriores (até 0.8.0). Alguns recursos só serão habilitados em corretoras mais recentes. Por exemplo, grupos de consumidores totalmente coordenados – ou seja, atribuição de partição dinâmica a vários consumidores no mesmo grupo – requerem o uso de corretores kafka 0,9+. O suporte a esse recurso para versões anteriores do corretor exigiria escrever e manter um código personalizado de eleição de liderança e de associação/verificação de integridade (talvez usando zookeeper ou cônsul). Para corretores mais antigos, você pode conseguir algo semelhante atribuindo manualmente diferentes partições a cada instância do consumidor com ferramentas de gerenciamento de configuração como chef, ansible, etc. Essa abordagem funcionará bem, embora não ofereça suporte ao reequilíbrio em caso de falhas. Consulte <https://kafka-python.readthedocs.io/en/master/compatibility.html> para obter mais detalhes.
Observe que o branch master pode conter recursos não lançados. Para documentação de lançamento, consulte readthedocs e/ou ajuda embutida do python.
>>> pip install kafka - python
KafkaConsumer é um consumidor de mensagens de alto nível, destinado a operar da forma mais semelhante possível ao cliente Java oficial. O suporte total para grupos de consumidores coordenados requer o uso de corretores kafka que suportam as APIs do Grupo: kafka v0.9+.
Consulte <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> para obter detalhes de API e configuração.
O iterador do consumidor retorna ConsumerRecords, que são tuplas nomeadas simples que expõem atributos básicos da mensagem: tópico, partição, deslocamento, chave e valor:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer é um produtor de mensagens assíncronas de alto nível. A classe pretende operar da forma mais semelhante possível ao cliente Java oficial. Consulte <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> para obter mais detalhes.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
O KafkaProducer pode ser usado em threads sem problemas, ao contrário do KafkaConsumer que não pode.
Embora seja possível usar o KafkaConsumer de maneira local de thread, o multiprocessamento é recomendado.
kafka-python suporta os seguintes formatos de compactação:
gzip é suportado nativamente, os outros requerem a instalação de bibliotecas adicionais. Consulte <https://kafka-python.readthedocs.io/en/master/install.html> para obter mais informações.
Kafka usa somas de verificação CRC32 para validar mensagens. kafka-python inclui uma implementação python pura para compatibilidade. Para melhorar o desempenho de aplicativos de alto rendimento, o kafka-python usará crc32c para código nativo otimizado, se instalado. Consulte <https://kafka-python.readthedocs.io/en/master/install.html> para obter instruções de instalação. Consulte https://pypi.org/project/crc32c/ para obter detalhes sobre a biblioteca crc32c subjacente.
Um objetivo secundário do kafka-python é fornecer uma camada de protocolo fácil de usar para interagir com corretores kafka por meio do python repl. Isso é útil para testes, sondagens e experimentações gerais. O suporte ao protocolo é aproveitado para ativar um método KafkaClient.check_version() que investiga um corretor kafka e tenta identificar qual versão ele está executando (0.8.0 a 2.6+).