為您的服務毫無輕鬆的事件流集成
FastStream簡化了為消息隊列編寫生產者和消費者的過程,自動處理所有解析,網絡和文檔生成。
製作流微服務從未如此簡單。 FastStream考慮了初級開發人員的設計,可以簡化您的工作,同時為更高級用例保持開門。這是使Faststream成為現代以數據為中心微服務的首選框架的核心功能。
多個經紀人: FastStream提供了一個統一的API,可以在多個消息經紀( Kafka , RabbitMQ , NATS , REDIS支持)中工作
Pydantic驗證:利用Pydantic的驗證功能序列化和驗證傳入消息
自動文檔:保持自動異步文檔
直觀:全型編輯器支持使您的開發體驗變得順利,在達到運行時會遇到錯誤
強大的依賴注入系統:通過FastStream的內置DI系統有效地管理服務依賴性
可測試:支持內存測試,使您的CI/CD管道更快,更可靠
擴展:使用擴展壽命,自定義序列化和中間件
集成: FastStream與您想要的任何HTTP框架完全兼容(尤其是FastAPI )
簡而言之,這是快速的- 令人愉悅,高效且功能強大。無論您只是從流式傳輸微服務開始還是想擴展, FastStream都可以讓您覆蓋。
文檔:https://fastStream.airt.ai/latest/
FastStream是一種基於Fastkafka和Propan所獲得的思想和經驗的新軟件包。通過加入我們的部隊,我們從兩個軟件包中獲取了最好的選擇,並創建了一種統一的方式來編寫能夠處理流數據的服務,而不論基礎協議如何。我們將繼續維護這兩個軟件包,但是這個項目將是新的開發項目。如果您要啟動新服務,則建議這樣做的方法。
FastStream在Linux , MacOS , Windows和大多數Unix式操作系統上都可以使用。您可以像往常一樣使用pip
安裝:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
默認情況下, FastStream使用用Rust編寫的Pydanticv2 ,但是您可以手動降級它,如果您的平台沒有生鏽支撐 - FastStream也可以與Pydanticv1合作。
FastStream Brokers提供方便的功能裝飾器@broker.subscriber
和@broker.publisher
允許您委派實際過程:
將數據攝入和生產到活動隊列,以及
解碼和編碼JSON編碼的消息
這些裝飾器使您可以輕鬆為消費者和生產者指定處理邏輯,從而使您可以專注於應用程序的核心業務邏輯,而不必擔心基礎集成。
此外, FastStream使用Pydantic將輸入JSON編碼的數據解析為Python對象,從而易於使用應用程序中的結構化數據,因此您只需使用類型註釋即可序列化輸入消息。
這是一個使用FastStream的示例Python應用程序,該應用程序將來自傳入數據流的數據消費並將數據輸出到另一個數據:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
另外, Pydantic的BaseModel
類允許您使用聲明語法定義消息,從而易於指定消息的字段和類型。
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
可以使用TestBroker
上下文管理器對該服務進行測試,默認情況下,該服務將經紀人置於“測試模式”。
測試儀將使您的subscriber
和publisher
裝飾功能將您的訂戶裝飾功能重定向到Inmemory Brokers,從而使您可以快速測試應用程序,而無需運行經紀人及其所有依賴項。
使用PYTEST,我們服務的測試看起來像這樣:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
可以使用內置的FastStream CLI命令開始應用程序。
在運行服務之前,請使用以下命令安裝FastStream CLI :
pip install " faststream[cli] "
要運行服務,請使用FastStream CLI命令並傳遞模塊(在這種情況下,將應用程序實現的位置)和app符號傳遞給命令。
faststream run basic:app
運行命令後,您應該看到以下輸出:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
此外, FastStream為您提供了一項出色的熱裝功能,以改善您的開發體驗
faststream run basic:app --reload
以及多處理水平縮放特徵:
faststream run basic:app --workers 3
您可以在此處了解有關CLI功能的更多信息
FastStream會根據異步規範自動為您的項目生成文檔。您可以同時使用生成的工件,並在相關團隊可用的資源上介紹文檔的網絡視圖。
此類文檔的可用性大大簡化了服務的集成:您可以立即查看應用程序使用的渠道和消息格式。最重要的是,它不會花費任何東西 - FastStream已經為您創建了文檔!
FastStream (得益於FastDepents )具有類似於pytest fixtures
的依賴關係管理系統,而FastAPI Depends
。函數參數聲明您想要哪些依賴項,並且特殊的裝飾器將它們從全局上下文對像中提供。
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
您可以在沒有FastStream
應用程序的情況下使用FastStream MQBrokers
。只需根據您的應用程序的壽命開始並停止它們即可。
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
同樣, FastStream可以用作FastApi的一部分。
只需導入您需要的streamRouter ,然後用相同的@router.subscriber(...)
和@router.publisher(...)
裝飾器聲明消息處理程序。
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
可以在此處找到更多集成功能
請通過以下方式保持支持並保持聯繫:
給我們的github存儲庫一個明星,然後
加入我們的en Discord服務器
加入我們的ru電報小組
您的支持有助於我們與您保持聯繫,並鼓勵我們繼續開發和改進框架。謝謝您的支持!
感謝所有使該項目變得更好的了這些驚人的人!