为您的服务毫无轻松的事件流集成
FastStream简化了为消息队列编写生产者和消费者的过程,自动处理所有解析,网络和文档生成。
制作流微服务从未如此简单。 FastStream考虑了初级开发人员的设计,可以简化您的工作,同时为更高级用例保持开门。这是使Faststream成为现代以数据为中心微服务的首选框架的核心功能。
多个经纪人: FastStream提供了一个统一的API,可以在多个消息经纪( Kafka , RabbitMQ , NATS , REDIS支持)中工作
Pydantic验证:利用Pydantic的验证功能序列化和验证传入消息
自动文档:保持自动异步文档
直观:全型编辑器支持使您的开发体验变得顺利,在达到运行时会遇到错误
强大的依赖注入系统:通过FastStream的内置DI系统有效地管理服务依赖性
可测试:支持内存测试,使您的CI/CD管道更快,更可靠
扩展:使用扩展寿命,自定义序列化和中间件
集成: FastStream与您想要的任何HTTP框架完全兼容(尤其是FastAPI )
简而言之,这是快速的- 令人愉悦,高效且功能强大。无论您只是从流式传输微服务开始还是想扩展, FastStream都可以让您覆盖。
文档:https://fastStream.airt.ai/latest/
FastStream是一种基于Fastkafka和Propan所获得的思想和经验的新软件包。通过加入我们的部队,我们从两个软件包中获取了最好的选择,并创建了一种统一的方式来编写能够处理流数据的服务,而不论基础协议如何。我们将继续维护这两个软件包,但是这个项目将是新的开发项目。如果您要启动新服务,则建议这样做的方法。
FastStream在Linux , MacOS , Windows和大多数Unix式操作系统上都可以使用。您可以像往常一样使用pip
安装:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
默认情况下, FastStream使用用Rust编写的Pydanticv2 ,但是您可以手动降级它,如果您的平台没有生锈支撑 - FastStream也可以与Pydanticv1合作。
FastStream Brokers提供方便的功能装饰器@broker.subscriber
和@broker.publisher
允许您委派实际过程:
将数据摄入和生产到活动队列,以及
解码和编码JSON编码的消息
这些装饰器使您可以轻松为消费者和生产者指定处理逻辑,从而使您可以专注于应用程序的核心业务逻辑,而不必担心基础集成。
此外, FastStream使用Pydantic将输入JSON编码的数据解析为Python对象,从而易于使用应用程序中的结构化数据,因此您只需使用类型注释即可序列化输入消息。
这是一个使用FastStream的示例Python应用程序,该应用程序将来自传入数据流的数据消费并将数据输出到另一个数据:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
另外, Pydantic的BaseModel
类允许您使用声明语法定义消息,从而易于指定消息的字段和类型。
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
可以使用TestBroker
上下文管理器对该服务进行测试,默认情况下,该服务将经纪人置于“测试模式”。
测试仪将使您的subscriber
和publisher
装饰功能将您的订户装饰功能重定向到Inmemory Brokers,从而使您可以快速测试应用程序,而无需运行经纪人及其所有依赖项。
使用PYTEST,我们服务的测试看起来像这样:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
可以使用内置的FastStream CLI命令开始应用程序。
在运行服务之前,请使用以下命令安装FastStream CLI :
pip install " faststream[cli] "
要运行服务,请使用FastStream CLI命令并传递模块(在这种情况下,将应用程序实现的位置)和app符号传递给命令。
faststream run basic:app
运行命令后,您应该看到以下输出:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
此外, FastStream为您提供了一项出色的热装功能,以改善您的开发体验
faststream run basic:app --reload
以及多处理水平缩放特征:
faststream run basic:app --workers 3
您可以在此处了解有关CLI功能的更多信息
FastStream会根据异步规范自动为您的项目生成文档。您可以同时使用生成的工件,并在相关团队可用的资源上介绍文档的网络视图。
此类文档的可用性大大简化了服务的集成:您可以立即查看应用程序使用的渠道和消息格式。最重要的是,它不会花费任何东西 - FastStream已经为您创建了文档!
FastStream (得益于FastDepents )具有类似于pytest fixtures
的依赖关系管理系统,而FastAPI Depends
。函数参数声明您想要哪些依赖项,并且特殊的装饰器将它们从全局上下文对象中提供。
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
您可以在没有FastStream
应用程序的情况下使用FastStream MQBrokers
。只需根据您的应用程序的寿命开始并停止它们即可。
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
同样, FastStream可以用作FastApi的一部分。
只需导入您需要的streamRouter ,然后用相同的@router.subscriber(...)
和@router.publisher(...)
装饰器声明消息处理程序。
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
可以在此处找到更多集成功能
请通过以下方式保持支持并保持联系:
给我们的github存储库一个明星,然后
加入我们的en Discord服务器
加入我们的ru电报小组
您的支持有助于我们与您保持联系,并鼓励我们继续开发和改进框架。谢谢您的支持!
感谢所有使该项目变得更好的了这些惊人的人!