تكامل دفق الأحداث دون عناء لخدماتك
يقوم Faststream بتبسيط عملية كتابة المنتجين والمستهلكين لقوائم الرسائل ، والتعامل مع جميع التحليل والشبكات وتوليد التوثيق تلقائيًا.
إن جعل الخدمات المجهرية المتدفق أسهل. تم تصميم FastStream مع مراعاة المطورين المبتدئين ، ويبسيط عملك مع الحفاظ على الباب مفتوحًا لمزيد من حالات الاستخدام المتقدمة. فيما يلي نظرة على الميزات الأساسية التي تجعل Faststream إطارًا للخدمات الدقيقة الحديثة التي تركز على البيانات.
العديد من الوسطاء : يوفر Faststream واجهة برمجة تطبيقات موحدة للعمل عبر وسطاء رسائل متعددة ( Kafka ، RabbitMQ ، Nats ، Redis Support)
التحقق من صحة Pydantic : استفادة من قدرات التحقق من صحة Pydantic لتسلسلها والتحقق من صحة الرسائل الواردة
مستندات أوتوماتيكية : ابق مقدماً في وثائق Asyncapi التلقائية
بديهية : دعم المحرر الكامل من النوع يجعل تجربة التطوير الخاصة بك سلسة ، وتصطاد الأخطاء قبل أن تصل إلى وقت التشغيل
نظام حقن التبعية القوي : إدارة تبعيات الخدمة بك بكفاءة مع نظام DI المدمج في FastSream
يمكن اختباره : يدعم الاختبارات في الذاكرة ، مما يجعل خط أنابيب CI/CD أسرع وأكثر موثوقية
قابلة للتمديد : استخدم الامتدادات للعمر والتسلسل المخصص والأدوات الوسيطة
التكامل : Faststream متوافق تمامًا مع أي إطار HTTP الذي تريده ( Fastapi خاصة)
هذا faststream باختصار - سهلة وفعالة وقوية. سواء كنت تبدأ للتو بتدفق الخدمات المجهرية أو تتطلع إلى التوسع ، فقد قمت بتغطية FastStream .
الوثائق : https://faststream.airt.ai/latest/
FastStream هي حزمة جديدة تستند إلى الأفكار والخبرات المكتسبة من Fastkafka و Propan . من خلال الانضمام إلى قواتنا ، التقطنا أفضل ما في كلتا الحزمتين وأنشأنا طريقة موحدة لكتابة الخدمات القادرة على معالجة البيانات التي تم بثها بغض النظر عن البروتوكول الأساسي. سنستمر في الحفاظ على كلتا الحزمتين ، ولكن التطوير الجديد سيكون في هذا المشروع. إذا كنت تبدأ خدمة جديدة ، فإن هذه الحزمة هي الطريقة الموصى بها للقيام بذلك.
يعمل FastStream على Linux و MacOs و Windows وأنظمة تشغيل Unix -Style. يمكنك تثبيته مع pip
كالمعتاد:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
بشكل افتراضي ، يستخدم FastStream Pydanticv2 المكتوب في الصدأ ، ولكن يمكنك تقليله يدويًا ، إذا لم يكن النظام الأساسي الخاص بك لديه دعم للصدأ - سيعمل Faststream بشكل صحيح مع Pydanticv1 أيضًا.
يوفر وسطاء FastStream ديكورات مريحة للوظائف @broker.subscriber
و @broker.publisher
للسماح لك بتفويض العملية الفعلية لـ:
استهلاك وإنتاج البيانات لقوائم الأحداث ، و
فك تشفير وترميز الرسائل المشفرة JSON
يسهل هذه الديكورات تحديد منطق المعالجة للمستهلكين والمنتجين ، مما يتيح لك التركيز على منطق العمل الأساسي لتطبيقك دون القلق بشأن التكامل الأساسي.
أيضًا ، يستخدم Faststream Pydantic لتحليل البيانات المشفرة JSON في كائنات Python ، مما يجعل من السهل العمل مع البيانات المنظمة في تطبيقاتك ، بحيث يمكنك تسلسل رسائل الإدخال الخاصة بك فقط باستخدام التعليقات التوضيحية للكوع.
فيما يلي مثال على تطبيق Python باستخدام FastStream الذي يستهلك البيانات من دفق بيانات وارد ويخرج البيانات إلى آخر:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
أيضًا ، تتيح لك فئة Pydantic 's BaseModel
تحديد الرسائل باستخدام بناء جملة التصريحي ، مما يجعل من السهل تحديد الحقول وأنواع رسائلك.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
يمكن اختبار الخدمة باستخدام مديري سياق TestBroker
، والتي ، بشكل افتراضي ، تضع الوسيط في "وضع الاختبار".
سيقوم المختبر بإعادة توجيه subscriber
publisher
المزينة إلى وسطاء inmemory ، مما يتيح لك اختبار تطبيقك بسرعة دون الحاجة إلى وسيط الجري وجميع تبعياته.
باستخدام Pytest ، سيبدو اختبار خدمتنا هكذا:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
يمكن بدء التطبيق باستخدام أمر Faststream CLI مدمج.
قبل تشغيل الخدمة ، قم بتثبيت Faststream CLI باستخدام الأمر التالي:
pip install " faststream[cli] "
لتشغيل الخدمة ، استخدم الأمر Faststream CLI وقم بتمرير الوحدة النمطية (في هذه الحالة ، الملف الذي يوجد فيه تطبيق التطبيق) ورمز التطبيق إلى الأمر.
faststream run basic:app
بعد تشغيل الأمر ، يجب أن ترى الإخراج التالي:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
أيضًا ، يوفر لك Faststream ميزة إعادة تحميل رائعة لتحسين تجربة التنمية الخاصة بك
faststream run basic:app --reload
وميزة التحجيم الأفقي متعدد المعالجة أيضًا:
faststream run basic:app --workers 3
يمكنك معرفة المزيد عن ميزات CLI هنا
يقوم FastStream تلقائيًا بإنشاء وثائق لمشروعك وفقًا لمواصفات Asyncapi . يمكنك العمل مع كل من القطع الأثرية التي تم إنشاؤها ووضع عرض ويب لوثائقك على الموارد المتاحة للفرق ذات الصلة.
إن توفر مثل هذه الوثائق يبسط بشكل كبير دمج الخدمات: يمكنك على الفور معرفة القنوات وتنسيقات الرسائل التي يعمل بها التطبيق. والأهم من ذلك ، أنه لن يكلف أي شيء - قام Faststream بالفعل بإنشاء المستندات لك!
يحتوي FastStream (بفضل BastDepends ) على نظام لإدارة التبعية مماثل pytest fixtures
و FastAPI Depends
في نفس الوقت. تعلن وسيطات الوظائف عن التبعيات التي تريدها ، ويقوم أحد الديكور الخاص بتسليمها من كائن السياق العالمي.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
يمكنك استخدام FastStream MQBrokers
بدون تطبيق FastStream
. ما عليك سوى البدء وإيقافهم وفقًا لعمر طلبك.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
أيضا ، يمكن استخدام Faststream كجزء من fastapi .
ما عليك سوى استيراد StreamRouter الذي تحتاجه وأعلن معالج الرسائل بنفس @router.subscriber(...)
و @router.publisher(...)
.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
يمكن العثور على المزيد من ميزات التكامل هنا
يرجى إظهار دعمك والبقاء على اتصال:
إعطاء مستودع جيثب نجم ، و
الانضمام إلى خادم Discord الخاص بنا
الانضمام إلى مجموعة RU Telegram
يساعدنا دعمك على البقاء على اتصال معك ويشجعنا على مواصلة تطوير وتحسين الإطار. شكرا لك على دعمك!
شكرا لجميع هؤلاء الأشخاص المذهلين الذين جعلوا المشروع أفضل!