Integração sem esforço de fluxo de eventos para seus serviços
O FastStream simplifica o processo de escrever produtores e consumidores para filas de mensagens, manipulando todas as gerações de análise, rede e documentação automaticamente.
Fazer microsserviços de streaming nunca foi tão fácil. Projetado com desenvolvedores juniores em mente, o FastStream simplifica seu trabalho enquanto mantém a porta aberta para casos de uso mais avançados. Aqui está uma olhada nos recursos centrais que fazem do FastStream uma estrutura preferida para microsserviços modernos e centrados em dados.
Vários corretores : o FastStream fornece uma API unificada para trabalhar em vários corretores de mensagens ( Kafka , RabbitMQ , Nats , Redis Support)
Validação Pydantic : alavancar os recursos de validação da Pydantic para serializar e validar mensagens recebidas
Documentos automáticos : fique à frente com documentação automática de Asyncapi
Intuitivo : o suporte do editor completo torna sua experiência de desenvolvimento suave e de captura antes de atingirem o tempo de execução
Sistema poderoso de injeção de dependência : gerencie suas dependências de serviço com eficiência com o sistema DI integrado da FastStream
Testável : suporta testes na memória, tornando seu pipeline CI/CD mais rápido e mais confiável
Extensible : Use extensões para vida útil, serialização personalizada e middleware
Integrações : FastStream é totalmente compatível com qualquer estrutura HTTP que você desejar (especialmente o FASTAPI )
Isso é rápido em poucas palavras - fácil, eficiente e poderoso. Esteja você apenas começando com o streaming de microsserviços ou procurando escalar, o FastStream o abriu.
Documentação : https://faststream.airt.ai/latest/
O FastStream é um novo pacote baseado nas idéias e experiências obtidas com Fastkafka e Propan . Ao ingressar em nossas forças, pegamos o melhor dos dois pacotes e criamos uma maneira unificada de escrever serviços capazes de processar dados transmitidos, independentemente do protocolo subjacente. Continuaremos a manter os dois pacotes, mas um novo desenvolvimento estará neste projeto. Se você estiver iniciando um novo serviço, este pacote é a maneira recomendada de fazê -lo.
O FastStream funciona em sistemas operacionais Linux , MacOS , Windows e a maioria do estilo Unix . Você pode instalá -lo com pip
como de costume:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
Por padrão, o FastStream usa o pydanticv2 escrito em ferrugem , mas você pode fazer o downgrade manualmente, se sua plataforma não tiver suporte de ferrugem - o FastStream funcionará corretamente com o Pydanticv1 também.
Os corretores FastStream fornecem decoradores de funções convenientes @broker.subscriber
e @broker.publisher
para permitir que você delegue o processo real de:
consumir e produzir dados para filas de eventos e
decodificar e codificar mensagens codificadas por JSON
Esses decoradores facilitam a especificação da lógica de processamento para seus consumidores e produtores, permitindo que você se concentre na lógica de negócios principal do seu aplicativo, sem se preocupar com a integração subjacente.
Além disso, o FastStream usa dados pydantic para analisar os dados codificados por JSON em objetos Python, facilitando o trabalho com dados estruturados em seus aplicativos, para que você possa serializar suas mensagens de entrada apenas usando anotações de tipo.
Aqui está um exemplo de aplicativo Python usando o FastStream que consome dados de um fluxo de dados recebidos e produz os dados para outro:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
Além disso, a classe BaseModel
da Pydantic permite definir mensagens usando uma sintaxe declarativa, facilitando a especificação dos campos e tipos de suas mensagens.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
O serviço pode ser testado usando os gerentes de contexto TestBroker
, que, por padrão, colocam o corretor no "modo de teste".
O testador redirecionará as funções decoradas do subscriber
e publisher
para os corretores InMemory, permitindo que você teste rapidamente seu aplicativo sem a necessidade de um corretor de corrida e todas as suas dependências.
Usando o Pytest, o teste para o nosso serviço ficaria assim:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
O aplicativo pode ser iniciado usando o comando interno da CLI FastStream .
Antes de executar o serviço, instale a CLI FastStream usando o seguinte comando:
pip install " faststream[cli] "
Para executar o serviço, use o comando FastStream CLI e passe no módulo (neste caso, o arquivo em que a implementação do aplicativo está localizada) e o símbolo do aplicativo para o comando.
faststream run basic:app
Depois de executar o comando, você deve ver a seguinte saída:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
Além disso, o FastStream fornece um ótimo recurso de recarga quente para melhorar sua experiência de desenvolvimento
faststream run basic:app --reload
E recurso de escala horizontal multiprocessante também:
faststream run basic:app --workers 3
Você pode aprender mais sobre os recursos da CLI aqui
O FastStream gera automaticamente a documentação para o seu projeto de acordo com a especificação Asyncapi . Você pode trabalhar com artefatos gerados e colocar uma visão da sua documentação sobre os recursos disponíveis para equipes relacionadas.
A disponibilidade dessa documentação simplifica significativamente a integração dos serviços: você pode ver imediatamente com quais canais e formatos de mensagem com os quais o aplicativo funciona. E o mais importante, não custará nada - o FastStream já criou os documentos para você!
O FastStream (graças ao FastDendends ) possui um sistema de gerenciamento de dependência semelhante aos pytest fixtures
e FastAPI Depends
ao mesmo tempo. Os argumentos da função declaram quais dependências você deseja e um decorador especial os entrega do objeto de contexto global.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
Você pode usar o FastStream MQBrokers
sem um aplicativo FastStream
. Basta começar e detê -los de acordo com a vida útil do seu aplicativo.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
Além disso, o FastStream pode ser usado como parte do FASTAPI .
Basta importar um streamRouter que você precisa e declarar o manipulador de mensagens com o mesmo @router.subscriber(...)
e @router.publisher(...)
Decoradores.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
Mais recursos de integração podem ser encontrados aqui
Mostre seu apoio e mantenha contato por:
dando ao nosso repositório de github uma estrela, e
Juntando -se ao nosso servidor EN Discord
Juntando -se ao nosso grupo Ru Telegram
Seu apoio nos ajuda a manter contato com você e nos incentiva a continuar desenvolvendo e melhorando a estrutura. Obrigado pelo seu apoio!
Obrigado a todas essas pessoas incríveis que tornaram o projeto melhor!