نظرًا لوجود مشكلات تتعلق بالإصدارات، يُقترح استخدام https://github.com/wbarnha/kafka-python-ng في الوقت الحالي
عميل Python لنظام معالجة التدفق الموزع Apache Kafka. تم تصميم kafka-python ليعمل بشكل يشبه إلى حد كبير عميل Java الرسمي، مع عدد قليل من واجهات pythonic (على سبيل المثال، التكرارات الاستهلاكية).
من الأفضل استخدام kafka-python مع الوسطاء الأحدث (0.9+)، ولكنه متوافق مع الإصدارات الأقدم (إلى 0.8.0). سيتم تمكين بعض الميزات فقط على الوسطاء الجدد. على سبيل المثال، تتطلب مجموعات المستهلكين المنسقة بالكامل - أي تعيين القسم الديناميكي لعدة مستهلكين في نفس المجموعة - استخدام 0.9+ من وسطاء كافكا. قد يتطلب دعم هذه الميزة لإصدارات الوسيط السابقة كتابة والحفاظ على انتخابات القيادة المخصصة ورمز التحقق من العضوية/الصحية (ربما باستخدام حارس الحديقة أو القنصل). بالنسبة للوسطاء الأقدم، يمكنك تحقيق شيء مماثل عن طريق تعيين أقسام مختلفة يدويًا لكل مثيل مستهلك باستخدام أدوات إدارة التكوين مثل Chef وAnsible وما إلى ذلك. سيعمل هذا الأسلوب بشكل جيد، على الرغم من أنه لا يدعم إعادة التوازن عند الفشل. راجع <https://kafka-python.readthedocs.io/en/master/compatibility.html> لمزيد من التفاصيل.
يرجى ملاحظة أن الفرع الرئيسي قد يحتوي على ميزات لم يتم إصدارها. للحصول على وثائق الإصدار، يرجى الاطلاع على readthedocs و/أو تعليمات python المضمنة.
>>> pip install kafka - python
KafkaConsumer هو مستهلك رسائل عالي المستوى، يهدف إلى العمل بشكل مماثل قدر الإمكان لعميل Java الرسمي. يتطلب الدعم الكامل لمجموعات المستهلكين المنسقة استخدام وسطاء كافكا الذين يدعمون واجهات برمجة التطبيقات للمجموعة: kafka v0.9+.
راجع <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> للحصول على تفاصيل واجهة برمجة التطبيقات والتكوين.
يقوم مكرر المستهلك بإرجاع سجلات المستهلك، وهي عبارة عن مجموعات مسماة بسيطة تعرض سمات الرسالة الأساسية: الموضوع، والقسم، والإزاحة، والمفتاح، والقيمة:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer هو منتج رسائل غير متزامن عالي المستوى. يهدف الفصل إلى العمل بشكل مشابه قدر الإمكان لعميل Java الرسمي. راجع <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> لمزيد من التفاصيل.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
يمكن استخدام KafkaProducer عبر سلاسل المحادثات دون مشكلة، على عكس KafkaConsumer الذي لا يمكنه ذلك.
في حين أنه من الممكن استخدام KafkaConsumer بطريقة محلية، إلا أنه يوصى بالمعالجة المتعددة.
يدعم kafka-python تنسيقات الضغط التالية:
يتم دعم gzip محليًا، بينما يتطلب الآخرون تثبيت مكتبات إضافية. راجع <https://kafka-python.readthedocs.io/en/master/install.html> لمزيد من المعلومات.
يستخدم كافكا المجموع الاختباري CRC32 للتحقق من صحة الرسائل. يتضمن kafka-python تطبيقًا خالصًا لـ python من أجل التوافق. لتحسين أداء التطبيقات عالية الإنتاجية، سيستخدم kafka-python crc32c للحصول على كود أصلي محسّن إذا تم تثبيته. راجع <https://kafka-python.readthedocs.io/en/master/install.html> للحصول على تعليمات التثبيت. راجع https://pypi.org/project/crc32c/ للحصول على تفاصيل حول lib crc32c الأساسي.
الهدف الثانوي لـ kafka-python هو توفير طبقة بروتوكول سهلة الاستخدام للتفاعل مع وسطاء kafka عبر python repl. وهذا مفيد للاختبار والتحقق والتجريب العام. يتم الاستفادة من دعم البروتوكول لتمكين طريقة KafkaClient.check_version() التي تستكشف وسيط kafka وتحاول تحديد الإصدار الذي يعمل عليه (0.8.0 إلى 2.6+).