サービスのための楽なイベントストリーム統合
FastStreamは、プロデューサーと消費者を執筆するプロセスをメッセージキューに簡素化し、すべての解析、ネットワーク、ドキュメントの生成を自動的に処理します。
ストリーミングマイクロサービスを作成することはかつてないほど容易になりました。ジュニア開発者を念頭に置いて設計されたFastStreamは、より高度なユースケースのためにドアを開いたままにしながら、作業を簡素化します。 FastStreamを最新のデータ中心のマイクロサービスの頼りになるフレームワークにするコア機能をご覧ください。
複数のブローカー: FastStreamは、複数のメッセージブローカー( Kafka 、 Rabbitmq 、 Nats 、 Redisサポート)で動作するために統一されたAPIを提供します
Pydantic Validation : Pydanticの検証機能を活用して、着信メッセージをシリアル化および検証します
自動ドキュメント:自動Asyncapiドキュメントを進めてください
直感的:フルタイプのエディターサポートにより、開発エクスペリエンスがスムーズになり、ランタイムに到達する前にエラーをキャッチします
強力な依存関係インジェクションシステム: FastStreamの組み込みDIシステムでサービス依存関係を効率的に管理する
テスト可能:メモリ内テストをサポートし、CI/CDパイプラインをより速く、より信頼性を高める
拡張可能:寿命、カスタムシリアル化、ミドルウェアに拡張機能を使用します
統合: FastStreamは、必要なHTTPフレームワークと完全に互換性があります(特にFastapi )
これは、簡単で効率的で強力で、一言で言えば高速です。ストリーミングマイクロサービスを始めたばかりであろうと、スケーリングを探している場合でも、 FastStreamがカバーされています。
ドキュメント:https://faststream.airt.ai/latest/
FastStreamは、 FastKafkaとPropanから得たアイデアと経験に基づいた新しいパッケージです。軍隊に参加することで、両方のパッケージからベストを獲得し、基礎となるプロトコルに関係なく、ストリーミングされたデータを処理できるサービスを作成する統一された方法を作成しました。引き続き両方のパッケージを維持しますが、このプロジェクトには新しい開発が行われます。新しいサービスを開始している場合、このパッケージはそれを行うための推奨方法です。
FastStreamは、Linux 、 MacOS 、 Windows 、およびMost Unixスタイルのオペレーティングシステムで動作します。いつものようにpip
でインストールできます。
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
デフォルトでは、 FastStreamはRustで書かれたPydanticv2を使用しますが、プラットフォームにRustサポートがない場合は、手動でダウングレードできます。FastStreamはPydanticv1でも正しく動作します。
FastStream Brokersは、便利な機能Decorators @broker.subscriber
および@broker.publisher
を提供して、次のプロセスを委任できるようにします。
イベントキューへのデータの消費と生産、および
JSONエンコードメッセージのデコードとエンコード
これらのデコレータにより、消費者や生産者向けの処理ロジックを簡単に指定できるようになり、基礎となる統合を心配することなく、アプリケーションのコアビジネスロジックに集中できます。
また、 FastStreamはPydanticを使用して入力JSONエンコードデータをPythonオブジェクトに解析し、アプリケーションで構造化されたデータを簡単に操作できるため、入力メッセージをタイプの注釈だけでシリアル化できます。
次に、 FastStreamを使用したPythonアプリの例です。これは、着信データストリームからデータを消費し、データを別のデータに出力します。
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
また、 PydanticのBaseModel
クラスを使用すると、宣言的構文を使用してメッセージを定義できるため、メッセージのフィールドとタイプを簡単に指定できます。
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
このサービスは、 TestBroker
コンテキストマネージャーを使用してテストできます。このマネージャーは、デフォルトでブローカーを「テストモード」にします。
テスターは、 subscriber
とpublisher
装飾機能をインメモリーブローカーにリダイレクトし、ランニングブローカーとそのすべての依存関係を必要とせずにアプリをすばやくテストすることができます。
Pytestを使用すると、サービスのテストは次のようになります。
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
アプリケーションは、組み込みのFastStream CLIコマンドを使用して開始できます。
サービスを実行する前に、次のコマンドを使用してFastStream CLIをインストールします。
pip install " faststream[cli] "
サービスを実行するには、 FastStream CLIコマンドを使用して、モジュール(この場合、アプリの実装があるファイル)とアプリシンボルをコマンドに渡します。
faststream run basic:app
コマンドを実行した後、次の出力が表示されます。
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
また、 FastStreamは、開発エクスペリエンスを向上させるための優れたホットリロード機能を提供します
faststream run basic:app --reload
マルチプロセッシング水平スケーリング機能も次のとおりです。
faststream run basic:app --workers 3
CLI機能の詳細については、こちらをご覧ください
FastStreamは、 Asyncapi仕様に従ってプロジェクトのドキュメントを自動的に生成します。生成されたアーティファクトの両方を使用して、関連チームが利用できるリソースに関するドキュメントのWebビューを配置できます。
このようなドキュメントの可用性により、サービスの統合が大幅に簡素化されます。アプリケーションが機能するチャネルとメッセージ形式をすぐに確認できます。そして最も重要なことは、それは何にもかかわらないことです - FastStreamはすでにあなたのためにドキュメントを作成しています!
FastStream ( FastDependsのおかげ)には、 pytest fixtures
と同様の依存関係管理システムがあり、 FastAPI Depends
。関数引数は、必要な依存関係を宣言し、特別なデコレーターがグローバルコンテキストオブジェクトからそれらを配信します。
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
FastStream
アプリケーションなしでFastStream MQBrokers
使用できます。アプリケーションの寿命に従って開始して停止するだけです。
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
また、 FastStreamをFastapiの一部として使用できます。
必要なStreamRouterをインポートし、同じ@router.subscriber(...)
および@router.publisher(...)
デコレーターでメッセージハンドラーを宣言します。
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
より多くの統合機能はここにあります
あなたのサポートを示して、次のように連絡を取り合ってください。
githubリポジトリに星を与える
EN Discordサーバーに参加します
Ru Telegramグループに参加します
あなたのサポートは、私たちがあなたと連絡を取り合い、フレームワークの開発と改善を続けることを奨励するのに役立ちます。ご支援ありがとうございます!
プロジェクトをより良くしたこれらの素晴らしい人々のすべてに感謝します!