การรวมสตรีมเหตุการณ์อย่างง่ายดายสำหรับบริการของคุณ
Faststream ทำให้กระบวนการเขียนผู้ผลิตและผู้บริโภคง่ายขึ้นสำหรับคิวข้อความจัดการการแยกวิเคราะห์การสร้างเครือข่ายและการสร้างเอกสารทั้งหมดโดยอัตโนมัติ
การสร้างไมโครเซิร์มมิ่งไม่เคยง่ายกว่านี้มาก่อน ออกแบบโดยคำนึงถึงนักพัฒนาจูเนียร์ Faststream ทำให้งานของคุณง่ายขึ้นในขณะที่เปิดประตูสำหรับกรณีการใช้งานขั้นสูงมากขึ้น นี่คือคุณสมบัติหลักที่ทำให้ Faststream เป็นเฟรมเวิร์กแบบ go-to สำหรับ microservices ที่ทันสมัยและเป็นศูนย์กลาง
โบรกเกอร์หลายตัว : Faststream จัดเตรียม API แบบครบวงจรให้ทำงานกับโบรกเกอร์หลายข้อความ ( Kafka , RabbitMQ , NATS , Redis Support)
การตรวจสอบความถูกต้องของ Pydantic : ใช้ประโยชน์จากความสามารถในการตรวจสอบความถูกต้อง ของ Pydantic เพื่อทำให้เป็นอนุกรมและตรวจสอบข้อความที่เข้ามา
เอกสารอัตโนมัติ : อยู่ข้างหน้าด้วยเอกสาร Asyncapi อัตโนมัติ
ใช้งานง่าย : การสนับสนุนตัวแก้ไขแบบเต็มรูปแบบทำให้ประสบการณ์การพัฒนาของคุณราบรื่นและจับข้อผิดพลาดก่อนที่จะถึงรันไทม์
ระบบการฉีดพึ่งพาการพึ่งพาที่มีประสิทธิภาพ : จัดการการพึ่งพาบริการของคุณอย่างมีประสิทธิภาพด้วยระบบ DI ในตัวของ Faststream
ทดสอบได้ : รองรับการทดสอบในหน่วยความจำทำให้ท่อส่ง CI/CD ของคุณเร็วขึ้นและเชื่อถือได้มากขึ้น
Extensible : ใช้ส่วนขยายสำหรับอายุการใช้งานการทำให้เป็นอนุกรมและมิดเดิลแวร์ที่กำหนดเอง
การรวม : Faststream เข้ากันได้อย่างสมบูรณ์กับกรอบ HTTP ใด ๆ ที่คุณต้องการ ( Fastapi โดยเฉพาะ)
นั่นคือ faststream โดยสรุป - ง่าย, มีประสิทธิภาพและมีประสิทธิภาพ ไม่ว่าคุณจะเพิ่งเริ่มต้นด้วยการสตรีม Microservices หรือกำลังมองหามาตราส่วน Faststream ทำให้คุณได้รับความคุ้มครอง
เอกสาร : https://faststream.airt.ai/latest/
Faststream เป็นแพ็คเกจใหม่ตามแนวคิดและประสบการณ์ที่ได้รับจาก Fastkafka และ Propan ด้วยการเข้าร่วมกองกำลังของเราเราได้รับสิ่งที่ดีที่สุดจากทั้งสองแพ็คเกจและสร้างวิธีการเขียนแบบครบวงจรในการเขียนบริการที่สามารถประมวลผลข้อมูลสตรีมได้โดยไม่คำนึงถึงโปรโตคอลพื้นฐาน เราจะรักษาทั้งสองแพ็คเกจต่อไป แต่การพัฒนาใหม่จะอยู่ในโครงการนี้ หากคุณกำลังเริ่มบริการใหม่แพ็คเกจนี้เป็นวิธีที่แนะนำในการทำ
Faststream ทำงานบน Linux , MacOS , Windows และระบบปฏิบัติการสไตล์ UNIX ส่วนใหญ่ คุณสามารถติดตั้งด้วย pip
ได้ตามปกติ:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
โดยค่าเริ่มต้น faststream ใช้ pydanticv2 ที่เขียนด้วย Rust แต่คุณสามารถปรับลดรุ่นด้วยตนเองได้หากแพลตฟอร์มของคุณไม่มีการสนับสนุน สนิม - Faststream จะทำงานได้อย่างถูกต้องกับ PydanticV1 เช่นกัน
โบรกเกอร์ faststream ให้บริการนักตกแต่งที่สะดวกสบาย @broker.subscriber
และ @broker.publisher
เพื่อให้คุณสามารถมอบหมายกระบวนการจริงของ:
การบริโภคและผลิตข้อมูลไปยังคิวกิจกรรมและ
การถอดรหัสและเข้ารหัสข้อความที่เข้ารหัส JSON
นักตกแต่งเหล่านี้ทำให้ง่ายต่อการระบุตรรกะการประมวลผลสำหรับผู้บริโภคและผู้ผลิตของคุณทำให้คุณสามารถมุ่งเน้นไปที่ตรรกะทางธุรกิจหลักของแอปพลิเคชันของคุณโดยไม่ต้องกังวลเกี่ยวกับการรวมระบบ
นอกจากนี้ Faststream ใช้ Pydantic เพื่อแยกวิเคราะห์ข้อมูลที่เข้ารหัส JSON ในวัตถุ Python ทำให้ง่ายต่อการทำงานกับข้อมูลที่มีโครงสร้างในแอปพลิเคชันของคุณดังนั้นคุณสามารถทำให้ข้อความอินพุตของคุณเป็นอนุกรมโดยใช้คำอธิบายประกอบประเภท
นี่คือตัวอย่างแอพ Python โดยใช้ faststream ที่ใช้ข้อมูลจากสตรีมข้อมูลที่เข้ามาและส่งออกข้อมูลไปยังอีกอันหนึ่ง:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
นอกจากนี้คลาส BaseModel
ของ Pydantic ช่วยให้คุณกำหนดข้อความโดยใช้ไวยากรณ์ที่ประกาศทำให้ง่ายต่อการระบุฟิลด์และประเภทของข้อความของคุณ
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
บริการสามารถทดสอบได้โดยใช้ผู้จัดการบริบท TestBroker
ซึ่งโดยค่าเริ่มต้นจะทำให้นายหน้าเป็น "โหมดการทดสอบ"
ผู้ทดสอบจะเปลี่ยนเส้นทาง subscriber
และ publisher
ของคุณไปยังโบรกเกอร์ Inmemory ของคุณซึ่งจะช่วยให้คุณทดสอบแอปของคุณได้อย่างรวดเร็วโดยไม่จำเป็นต้องมีโบรกเกอร์ที่กำลังทำงานอยู่และการพึ่งพาทั้งหมด
การใช้ pytest การทดสอบสำหรับบริการของเราจะเป็นแบบนี้:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
แอปพลิเคชันสามารถเริ่มต้นโดยใช้คำสั่ง Faststream CLI ในตัว
ก่อนเรียกใช้บริการให้ติดตั้ง Faststream CLI โดยใช้คำสั่งต่อไปนี้:
pip install " faststream[cli] "
ในการเรียกใช้บริการให้ใช้คำสั่ง FastStream CLI และส่งผ่านโมดูล (ในกรณีนี้ไฟล์ที่การใช้งานแอปตั้งอยู่) และสัญลักษณ์แอพไปยังคำสั่ง
faststream run basic:app
หลังจากเรียกใช้คำสั่งคุณควรเห็นผลลัพธ์ต่อไปนี้:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
นอกจากนี้ Faststream ยังให้คุณสมบัติการโหลดใหม่ที่ยอดเยี่ยมแก่คุณเพื่อปรับปรุงประสบการณ์การพัฒนาของคุณ
faststream run basic:app --reload
และคุณลักษณะการปรับขนาดแนวนอนหลายครั้งเช่นกัน:
faststream run basic:app --workers 3
คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับคุณสมบัติ CLI ได้ที่นี่
Faststream สร้างเอกสารสำหรับโครงการของคุณโดยอัตโนมัติตามข้อกำหนดของ Asyncapi คุณสามารถทำงานกับทั้งสิ่งประดิษฐ์ที่สร้างขึ้นและวางมุมมองเว็บของเอกสารเกี่ยวกับทรัพยากรที่มีให้กับทีมที่เกี่ยวข้อง
ความพร้อมใช้งานของเอกสารดังกล่าวทำให้การรวมบริการง่ายขึ้นอย่างมาก: คุณสามารถดูได้ทันทีว่าช่องและรูปแบบข้อความที่แอปพลิเคชันทำงานได้ทันที และที่สำคัญที่สุดมันจะไม่เสียค่าใช้จ่ายอะไรเลย - Faststream ได้สร้างเอกสารให้คุณแล้ว!
Faststream (ขอบคุณ FastDepends ) มีระบบการจัดการการพึ่งพาคล้ายกับ pytest fixtures
และ FastAPI Depends
ในเวลาเดียวกัน ข้อโต้แย้งของฟังก์ชั่นประกาศว่าคุณต้องการการพึ่งพาใดและมัณฑนากรพิเศษส่งมอบจากวัตถุบริบททั่วโลก
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
คุณสามารถใช้ faststream MQBrokers
โดยไม่ต้องใช้แอปพลิเคชัน FastStream
เพียง เริ่มต้น และ หยุด พวกเขาตามอายุขัยของแอปพลิเคชันของคุณ
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
นอกจากนี้ Faststream สามารถใช้เป็นส่วนหนึ่งของ fastapi
เพียงแค่นำเข้า สตรีม ที่คุณต้องการและประกาศตัวจัดการข้อความด้วย @router.subscriber(...)
และ @router.publisher(...)
นักตกแต่ง
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
สามารถพบคุณสมบัติการรวมเพิ่มเติมได้ที่นี่
โปรดแสดงการสนับสนุนของคุณและติดต่อโดย:
ให้ดาวที่เก็บ GitHub ของเราและ
เข้าร่วมเซิร์ฟเวอร์ EN Discord ของเรา
เข้าร่วมกลุ่ม Ru Telegram ของเรา
การสนับสนุนของคุณช่วยให้เราติดต่อกับคุณและกระตุ้นให้เราพัฒนาและปรับปรุงกรอบงานต่อไป ขอบคุณสำหรับการสนับสนุน!
ขอบคุณคนที่น่าทึ่งเหล่านี้ที่ทำให้โครงการดีขึ้น!