Легкая интеграция потока событий для ваших услуг
FastStream упрощает процесс написания производителей и потребителей для очередей сообщений, автоматического обработки всего анализа, сети и генерации документов.
Создание потоковых микросервисов никогда не было проще. Разработанный с учетом младших разработчиков, Faststream упрощает вашу работу, сохраняя при этом дверь открытой для более продвинутых вариантов использования. Вот посмотрите на основные функции, которые делают FastStream для современных микросервисов, ориентированных на данные.
Несколько брокеров : Faststream предоставляет унифицированный API для работы в нескольких брокерах сообщений ( Kafka , Rabbitmq , Nats , Redis Support)
Пайдантская проверка : Используйте возможности проверки Pydantic для сериализации и проверки входящих сообщений
Автоматические документы : оставайтесь вперед с автоматической документацией Asyncapi
Интуитивно понятная : полная поддержка редактора делает ваш опыт разработки плавным, улавливая ошибки, прежде чем они достигнут времени выполнения
Мощная система впрыска зависимостей : эффективно управлять зависимостью обслуживания со встроенной системой DI Faststream
Тестируемый : поддерживает тесты в памяти, делая ваш трубопровод CI/CD быстрее и надежным
Расширение : используйте расширения для продолжительности жизни, пользовательскую сериализацию и промежуточное программное обеспечение
Интеграции : FastStream полностью совместим с любым HTTP -структурой, который вы хотите (особенно в Fastapi )
Это Faststream в двух словах - Easy, эффективно и мощный. Независимо от того, начинаете ли вы только с потоковых микросервисов или хотите масштабировать, Faststream заставил вас покрыть.
Документация : https://faststream.airt.ai/latest/
Faststream - это новый пакет, основанный на идеях и опыте, полученных от Fastkafka и Propan . Присоединившись к нашим силам, мы взяли лучшее из обоих пакетов и создали единый способ записи услуг, способных обрабатывать потоковые данные независимо от базового протокола. Мы будем продолжать поддерживать оба пакета, но в этом проекте будет новая разработка. Если вы начинаете новый сервис, этот пакет является рекомендуемым способом сделать это.
FastStream работает над Linux , MacOS , Windows и большинством операционных систем Unix -Style. Вы можете установить его с pip
, как обычно:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
По умолчанию Faststream использует Pydantyv2, написанный в Rust , но вы можете понизить его вручную, если на вашей платформе нет поддержки ржавчины - Faststream также будет работать правильно с Pydanticv1 .
Брокеры Faststream предоставляют удобные функциональные декораторы @broker.subscriber
и @broker.publisher
чтобы позволить вам делегировать фактический процесс:
потребление и производство данных в очереди событий, и
декодирование и кодирование сообщений, кодируемых JSON
Эти декораторы позволяют легко указать логику обработки для ваших потребителей и производителей, что позволяет вам сосредоточиться на основной бизнес -логике вашего приложения, не беспокоясь о базовой интеграции.
Кроме того, FastStream использует Pydantic для разбора данных, кодируемых JSON, в объекты Python, что позволяет легко работать со структурированными данными в ваших приложениях, так что вы можете сериализовать свои входные сообщения, просто используя аннотации типа.
Вот пример приложения Python с использованием FastStream , которое потребляет данные из входящего потока данных и выводит данные другому:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
Кроме того, класс BaseModel
в Pydantic позволяет определять сообщения, используя декларативный синтаксис, что позволяет легко указать поля и типы ваших сообщений.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
Сервис может быть протестирована с использованием контекстных менеджеров TestBroker
, что по умолчанию помещает брокера в «режим тестирования».
Тестер будет перенаправить ваши функции subscriber
и publisher
украшенных брокерами Inmemory, что позволит вам быстро протестировать ваше приложение без необходимости в бегущем брокере и всех его зависимостях.
Используя Pytest, тест для нашего сервиса будет выглядеть так:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
Приложение может быть запущено с использованием встроенной команды FastStream CLI.
Перед запуском службы установите FastStream CLI, используя следующую команду:
pip install " faststream[cli] "
Чтобы запустить службу, используйте команду FastStream CLI и передайте модуль (в данном случае файл, в котором находится реализация приложения) и символ приложения в команду.
faststream run basic:app
После запуска команды вы должны увидеть следующий вывод:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
Кроме того, FastStream предоставляет вам отличную функцию горячей перезагрузки для улучшения вашего опыта разработки
faststream run basic:app --reload
И многопроцессорное горизонтальное масштабирование также:
faststream run basic:app --workers 3
Вы можете узнать больше о функциях CLI здесь
Faststream автоматически генерирует документацию для вашего проекта в соответствии с спецификацией Asyncapi . Вы можете работать с как сгенерированными артефактами и разместить веб -представление о вашей документации по ресурсам, доступным для связанных команд.
Доступность такой документации значительно упрощает интеграцию услуг: вы можете немедленно увидеть, какие каналы и форматы сообщений работает, с которым работает приложение. И самое главное, это ничего не будет стоить - Faststream уже создал для вас документы!
FastStream (благодаря FASTDEDENDS ) имеет систему управления зависимостями, похожая на pytest fixtures
, и FastAPI Depends
в то же время. Функциональные аргументы заявляют, какие зависимости вам нужны, а специальный декоратор доставляет их из глобального контекстного объекта.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
Вы можете использовать FastStream MQBrokers
без приложения FastStream
. Просто запустите и остановите их в соответствии с жизнью вашего приложения.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
Кроме того, FastStream может использоваться как часть Fastapi .
Просто импортируйте необходимый вам streamrouter и объявите обработчик сообщений тем же @router.subscriber(...)
и @router.publisher(...)
декораторы.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
Больше функций интеграции можно найти здесь
Пожалуйста, покажите свою поддержку и оставайтесь на связи:
давая нашему репозиторию GitHub звезду, и
Присоединение к нашему серверу EN Discord
Присоединение к нашей группе Ru Telegram
Ваша поддержка помогает нам оставаться на связи с вами и побуждает нас продолжать развивать и улучшать структуру. Спасибо за вашу поддержку!
Спасибо всем этим удивительным людям, которые сделали проект лучше!