لقد تم إهمال هذه المكتبة ولم تعد تتم إدارتها أو دعمها. يمكن العثور على مشروع المجتمع النشط الحالي على https://github.com/faust-streaming/faust
إصدار: | 1.10.4 |
---|---|
الويب: | http://faust.readthedocs.io/ |
تحميل: | http://pypi.org/project/faust |
مصدر: | http://github.com/robinhood/faust |
الكلمات الرئيسية: | توزيع، دفق، غير متزامن، معالجة، بيانات، قائمة انتظار، إدارة الحالة |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust هي مكتبة لمعالجة التدفق، تنقل الأفكار من Kafka Streams إلى Python.
يتم استخدامه في Robinhood لبناء أنظمة موزعة عالية الأداء وخطوط أنابيب بيانات في الوقت الفعلي تعالج مليارات الأحداث كل يوم.
يوفر Faust كلاً من معالجة التدفق ومعالجة الأحداث ، ومشاركة التشابه مع أدوات مثل Kafka Streams، وApache Spark/Storm/Samza/Flink،
إنه لا يستخدم DSL، إنه مجرد Python! هذا يعني أنه يمكنك استخدام جميع مكتبات Python المفضلة لديك عند معالجة الدفق: NumPy، PyTorch، Pandas، NLTK، Django، Flask، SQLAlchemy، ++
يتطلب Faust إصدار Python 3.6 أو إصدار أحدث لبناء الجملة الجديد غير المتزامن/الانتظار والتعليقات التوضيحية من النوع المتغير.
فيما يلي مثال لمعالجة مجموعة من الطلبات الواردة:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
يحدد مصمم الديكور "معالج الدفق" الذي يستهلك بشكل أساسي من موضوع كافكا ويفعل شيئًا لكل حدث يتلقاه.
الوكيل عبارة عن وظيفة async def
، لذا يمكنه أيضًا إجراء عمليات أخرى بشكل غير متزامن، مثل طلبات الويب.
يمكن لهذا النظام أن يستمر في حالته، ويعمل مثل قاعدة البيانات. تتم تسمية الجداول بمخازن المفاتيح/القيم الموزعة التي يمكنك استخدامها كقواميس بايثون عادية.
يتم تخزين الجداول محليًا على كل جهاز باستخدام قاعدة بيانات مدمجة فائقة السرعة مكتوبة بلغة C++، تسمى RocksDB.
يمكن للجداول أيضًا تخزين الأعداد المجمعة التي يتم "إطارها" بشكل اختياري حتى تتمكن من تتبع "عدد النقرات من اليوم الأخير" أو "عدد النقرات في الساعة الأخيرة". على سبيل المثال. مثل Kafka Streams، نحن ندعم نوافذ الوقت المتقلبة والقفز والانزلاق، ويمكن انتهاء صلاحية النوافذ القديمة لمنع امتلاء البيانات.
من أجل الموثوقية نستخدم موضوع كافكا باسم "سجل الكتابة المسبق". كلما تم تغيير المفتاح نقوم بالنشر في سجل التغيير. تستهلك العقد الاحتياطية من سجل التغيير هذا للاحتفاظ بنسخة طبق الأصل من البيانات وتمكين الاسترداد الفوري في حالة فشل أي من العقد.
بالنسبة للمستخدم، يعد الجدول مجرد قاموس، ولكن البيانات تظل قائمة بين عمليات إعادة التشغيل ويتم نسخها عبر العقد، لذلك عند تجاوز الفشل يمكن للعقد الأخرى أن تتولى المسؤولية تلقائيًا.
يمكنك حساب مشاهدات الصفحة حسب عنوان URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
يتم تقسيم البيانات المرسلة إلى موضوع كافكا، مما يعني أنه سيتم تقسيم النقرات حسب عنوان URL بحيث يتم تسليم كل عدد لنفس عنوان URL إلى نفس مثيل عامل Faust.
يدعم Faust أي نوع من بيانات التدفق: البايتات وUnicode والهياكل المتسلسلة، ولكنه يأتي أيضًا مع "نماذج" تستخدم بناء جملة Python الحديث لوصف كيفية تسلسل المفاتيح والقيم في التدفقات:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
تتم كتابة Faust بشكل ثابت، باستخدام مدقق النوع mypy
، حتى تتمكن من الاستفادة من الأنواع الثابتة عند كتابة التطبيقات.
كود مصدر Faust صغير الحجم ومنظم جيدًا ويعمل كمورد جيد لتعلم كيفية تنفيذ Kafka Streams.
فاوست سهل الاستخدام للغاية. للبدء في استخدام حلول معالجة التدفق الأخرى، يتعين عليك تعقيد مشاريع hello-world ومتطلبات البنية التحتية. يتطلب Faust كافكا فقط، والباقي هو Python فقط، لذا إذا كنت تعرف Python، فيمكنك بالفعل استخدام Faust للقيام بمعالجة الدفق، ويمكن أن يتكامل مع أي شيء تقريبًا.
إليك أحد التطبيقات الأسهل التي يمكنك إجراؤها:
استيراد فاوست تحية الفصل (faust.Record): from_name: شارع to_name: شارع app = faust.App('hello-app', Broker='kafka://localhost') الموضوع = app.topic('مرحبًا بالموضوع'، value_type=ترحيب) @app.agent(الموضوع) غير متزامن ديه مرحبا (تحياتي): غير متزامن للتحية في التحية: طباعة (f'مرحبًا من {greeting.from_name} إلى {greeting.to_name}') @app.timer(الفاصل الزمني=1.0) غير متزامن def example_sender(app): انتظر مرحبا.أرسل( القيمة = تحية (from_name = "فاوست"، to_name = "أنت")، ) إذا كان __name__ == '__main__': التطبيق الرئيسي ()
من المحتمل أنك تخشى بعض الشيء من عدم المزامنة وتنتظر الكلمات الرئيسية، ولكن لا يتعين عليك معرفة كيفية عمل asyncio
لاستخدام Faust: فقط قم بتقليد الأمثلة، وستكون على ما يرام.
يبدأ تطبيق المثال مهمتين: إحداهما معالجة دفق، والأخرى عبارة عن سلسلة رسائل خلفية ترسل الأحداث إلى ذلك الدفق. في تطبيق واقعي، سيقوم نظامك بنشر الأحداث إلى موضوعات كافكا التي يمكن لمعالجاتك الاستهلاك منها، ويكون مؤشر ترابط الخلفية مطلوبًا فقط لإدخال البيانات في مثالنا.
يمكنك تثبيت Faust إما عبر Python Package Index (PyPI) أو من المصدر.
للتثبيت باستخدام النقطة:
$ pip install -U faust
يحدد Faust أيضًا مجموعة من ملحقات setuptools
التي يمكن استخدامها لتثبيت Faust والتبعيات الخاصة بميزة معينة.
يمكنك تحديد هذه العناصر في متطلباتك أو في سطر أوامر pip
باستخدام الأقواس. افصل بين الحزم المتعددة باستخدام الفاصلة:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
الحزم التالية متوفرة:
faust[rocksdb] : | لاستخدام RocksDB لتخزين حالة جدول Faust. الموصى بها في الإنتاج. |
---|
faust[redis] : | لاستخدام Redis_ كواجهة خلفية بسيطة للتخزين المؤقت (نمط Memcached). |
---|
faust[yaml] : | لاستخدام YAML ومكتبة PyYAML في التدفقات. |
---|
faust[fast] : | لتثبيت جميع ملحقات تسريع C المتوفرة على Faust core. |
---|
faust[datadog] : | لاستخدام شاشة Datadog Faust. |
---|---|
faust[statsd] : | لاستخدام شاشة Statsd Faust. |
faust[uvloop] : | لاستخدام فاوست مع uvloop . |
---|---|
faust[eventlet] : | لاستخدام فاوست مع eventlet |
faust[debug] : | لاستخدام aiomonitor للاتصال وتصحيح عامل Faust قيد التشغيل. |
---|---|
faust[setproctitle] : | عندما يتم تثبيت وحدة setproctitle ، سيستخدمها عامل Faust لتعيين اسم عملية أفضل في قوائم ps / top . تم تثبيته أيضًا مع الحزم fast debug . |
قم بتنزيل أحدث إصدار من Faust من http://pypi.org/project/faust
يمكنك تثبيته عن طريق القيام بما يلي:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
يجب تنفيذ الأمر الأخير كمستخدم مميز إذا كنت لا تستخدم Virtualenv حاليًا.
يمكنك تثبيت أحدث لقطة من Faust باستخدام أمر pip
التالي:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
نعم! استخدم eventlet
كجسر للتكامل مع asyncio
.
eventlet
يعمل هذا الأسلوب مع أي مكتبة Python محظورة يمكنها العمل مع eventlet
.
يتطلب استخدام eventlet
تثبيت وحدة aioeventlet
، ويمكنك تثبيتها كحزمة مع Faust:
$ pip install -U faust[eventlet]
ومن ثم لاستخدام eventslet فعليًا كحلقة حدث، عليك إما استخدام الوسيطة -L <faust --loop>
لبرنامج faust
:
$ faust -L eventlet -A myproj worker -l info
أو أضف import mode.loop.eventlet
في الجزء العلوي من البرنامج النصي لنقطة الدخول:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
تحذير
من المهم جدًا أن يكون هذا في أعلى الوحدة، وأن يتم تنفيذه قبل استيراد المكتبات.
نعم! استخدم جسر tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
نعم! استخدم تطبيق مفاعل asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
لا، يتطلب Faust إصدار Python 3.6 أو إصدار أحدث، نظرًا لأنه يستخدم بشكل كبير الميزات التي تم تقديمها في Python 3.6 (التعليقات التوضيحية غير المتزامنة، والانتظار، والنوع المتغير).
قد تحتاج إلى زيادة الحد الأقصى لعدد الملفات المفتوحة. يشرح المنشور التالي كيفية القيام بذلك على نظام التشغيل OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
يدعم فاوست كافكا بإصدار >= 0.10.
لإجراء مناقشات حول استخدام Faust وتطويره ومستقبله، يرجى الانضمام إلى `fauststream`_ Slack.
إذا كان لديك أي اقتراحات أو تقارير أخطاء أو إزعاجات، فيرجى الإبلاغ عنها إلى متتبع المشكلات لدينا على https://github.com/robinhood/faust/issues/
تم ترخيص هذا البرنامج بموجب ترخيص BSD الجديد. راجع ملف LICENSE
الموجود في دليل التوزيع العلوي للحصول على نص الترخيص الكامل.
يتم تطوير Faust في GitHub: https://github.com/robinhood/faust
نحن نشجعك بشدة على المشاركة في تطوير فاوست.
تأكد أيضًا من قراءة قسم المساهمة في فاوست في الوثائق.
من المتوقع أن يتبع كل من يتفاعل في قواعد أكواد المشروع ومتتبعي المشكلات وغرف الدردشة والقوائم البريدية قواعد سلوك فاوست.
باعتبارنا مساهمين ومشرفين على هذه المشاريع، ومن أجل تعزيز مجتمع مفتوح ومرحب، نتعهد باحترام جميع الأشخاص الذين يساهمون من خلال الإبلاغ عن المشكلات، ونشر طلبات الميزات، وتحديث الوثائق، وتقديم طلبات السحب أو التصحيحات، وغيرها من الأنشطة.
نحن ملتزمون بجعل المشاركة في هذه المشاريع تجربة خالية من التحرش للجميع، بغض النظر عن مستوى الخبرة أو الجنس أو الهوية الجنسية والتعبير أو التوجه الجنسي أو الإعاقة أو المظهر الشخصي أو حجم الجسم أو العرق أو الأصل العرقي أو العمر أو الدين أو جنسية.
تتضمن أمثلة السلوك غير المقبول من قبل المشاركين ما يلي:
يتمتع مشرفو المشروع بالحق والمسؤولية في إزالة أو تحرير أو رفض التعليقات والالتزامات والتعليمات البرمجية وتحريرات wiki والمشكلات والمساهمات الأخرى التي لا تتوافق مع قواعد السلوك هذه. من خلال اعتماد قواعد السلوك هذه، يلتزم القائمون على المشروع بتطبيق هذه المبادئ بشكل عادل ومتسق على كل جانب من جوانب إدارة هذا المشروع. قد تتم إزالة مشرفي المشروع الذين لا يتبعون أو ينفذون قواعد السلوك بشكل دائم من فريق المشروع.
تنطبق قواعد السلوك هذه داخل مساحات المشروع وفي الأماكن العامة عندما يمثل الفرد المشروع أو مجتمعه.
قد يتم الإبلاغ عن حالات السلوك المسيء أو المضايق أو غير المقبول عن طريق فتح مشكلة أو الاتصال بواحد أو أكثر من القائمين على المشروع.
تم تعديل قواعد السلوك هذه من ميثاق المساهمين، الإصدار 1.2.0 المتوفر على http://contributor-covenant.org/version/1/2/0/.