Intégration de flux d'événements sans effort pour vos services
FastStream simplifie le processus d'écriture des producteurs et des consommateurs pour les files d'attente de messages, gérant automatiquement toutes les analyses de l'analyse, de réseautage et de documentation.
Faire des microservices en streaming n'a jamais été aussi simple. Conçu avec les développeurs juniors à l'esprit, FastStream simplifie votre travail tout en gardant la porte ouverte pour des cas d'utilisation plus avancés. Voici un aperçu des fonctionnalités de base qui font de FastStream un framework incontournable pour les microservices modernes et centrés sur les données.
Plusieurs courtiers : FastStream fournit une API unifiée pour fonctionner sur plusieurs courtiers de messages ( Kafka , Rabbitmq , Nats , Redis Support)
Validation pydatique : tirer parti des capacités de validation de Pyndantique pour sérialiser et valider les messages entrants
Docs automatiques : Restez en avance avec la documentation automatique asyncapi
Intuitif : le support de l'éditeur complet rend votre expérience de développement en douceur, des erreurs de capture avant d'atteindre le temps d'exécution
Système d'injection de dépendance puissante : gérer efficacement vos dépendances de service avec le système DI intégré de FastStream
Testable : prend en charge les tests en mémoire, ce qui rend votre pipeline CI / CD plus rapidement et plus fiable
Extensible : utilisez des extensions pour la durée de vie, la sérialisation personnalisée et le middleware
INTERGATIONS : FastStream est entièrement compatible avec tout cadre HTTP que vous souhaitez ( Fastapi en particulier)
C'est rapide en un mot - facile, efficace et puissant. Que vous commenciez simplement par les microservices en streaming ou que vous cherchiez à évoluer, FastStream vous a couvert.
Documentation : https://faststream.airt.ai/latest/
FastStream est un nouveau package basé sur les idées et les expériences acquises par Fastkafka et Propan . En rejoignant nos forces, nous avons pris le meilleur des deux packages et créé une façon unifiée d'écrire des services capables de traiter les données en streaming quel que soit le protocole sous-jacent. Nous continuerons à maintenir les deux packages, mais le nouveau développement sera dans ce projet. Si vous démarrez un nouveau service, ce package est le moyen recommandé de le faire.
FastStream fonctionne sur Linux , MacOS , Windows et les systèmes d'exploitation de style la plupart des UNIX . Vous pouvez l'installer avec pip
comme d'habitude:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
Par défaut, FastStream utilise PydanticV2 écrite en rouille , mais vous pouvez le rétrograder manuellement, si votre plate-forme n'a pas de support de rouille - FastStream fonctionnera également correctement avec PyndanticV1 .
Les courtiers FastStream fournissent des décorateurs de fonctions pratiques @broker.subscriber
et @broker.publisher
pour vous permettre de déléguer le processus réel de:
consommer et produire des données aux files d'attente d'événements, et
Décodage et codage des messages en codés JSON
Ces décorateurs facilitent la spécification de la logique de traitement de vos consommateurs et producteurs, vous permettant de vous concentrer sur la logique métier principale de votre application sans vous soucier de l'intégration sous-jacente.
En outre, FastStream utilise Pydontic pour analyser les données codées JSON à saisir dans les objets Python, ce qui facilite le travail avec des données structurées dans vos applications, afin que vous puissiez sérialiser vos messages d'entrée uniquement à l'aide d'annotations de type.
Voici un exemple d'application Python utilisant FastStream qui consomme des données à partir d'un flux de données entrant et publie les données à une autre:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
En outre, la classe BaseModel
de Pydontic vous permet de définir des messages à l'aide d'une syntaxe déclarative, ce qui facilite la spécification des champs et des types de vos messages.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
Le service peut être testé à l'aide des gestionnaires de contexte TestBroker
, qui, par défaut, met le courtier en "mode de test".
Le testeur redirigera votre subscriber
et publisher
décoré des fonctions vers les courtiers InMemory, vous permettant de tester rapidement votre application sans avoir besoin d'un courtier en cours d'exécution et de toutes ses dépendances.
En utilisant Pytest, le test pour notre service ressemblerait à ceci:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
L'application peut être démarrée à l'aide de la commande CLI FastStream intégrée.
Avant d'exécuter le service, installez FastStream CLI en utilisant la commande suivante:
pip install " faststream[cli] "
Pour exécuter le service, utilisez la commande CLI FastStream et transmettez le module (dans ce cas, le fichier où se trouve l'implémentation de l'application) et le symbole de l'application à la commande.
faststream run basic:app
Après avoir exécuté la commande, vous devriez voir la sortie suivante:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
De plus, FastStream vous offre une excellente fonctionnalité de rechargement chaud pour améliorer votre expérience de développement
faststream run basic:app --reload
Et multiproceser la fonction de mise à l'échelle horizontale également:
faststream run basic:app --workers 3
Vous pouvez en savoir plus sur les fonctionnalités CLI ici
FastStream génère automatiquement la documentation de votre projet en fonction de la spécification ASyncAPI . Vous pouvez travailler avec les deux artefacts générés et placer une vue Web de votre documentation sur les ressources disponibles pour les équipes connexes.
La disponibilité d'une telle documentation simplifie considérablement l'intégration des services: vous pouvez immédiatement voir avec quels canaux et formats de messages avec lesquels l'application fonctionne. Et surtout, cela ne coûtera rien - FastStream a déjà créé les documents pour vous!
FastStream (grâce à FastDepends ) a un système de gestion des dépendances similaire à pytest fixtures
et FastAPI Depends
en même temps. Les arguments de fonction déclarent quelles dépendances souhaitées sont nécessaires et un décorateur spécial les livre à partir de l'objet de contexte global.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
Vous pouvez utiliser FastStream MQBrokers
sans application FastStream
. Commencez et arrêtez- les en fonction de la durée de vie de votre demande.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
De plus, FastStream peut être utilisé dans le cadre de Fastapi .
Importez simplement un streamrouter dont vous avez besoin et déclarez le gestionnaire de messages avec le même @router.subscriber(...)
et @router.publisher(...)
décorateurs.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
Plus de fonctionnalités d'intégration peuvent être trouvées ici
Veuillez montrer votre soutien et rester en contact par:
Donner à notre référentiel GitHub une étoile, et
Rejoindre notre serveur EN Discord
Rejoindre notre groupe RU Telegram
Votre soutien nous aide à rester en contact avec vous et nous encourage à continuer de développer et d'améliorer le cadre. Merci pour votre soutien!
Merci à toutes ces personnes incroyables qui ont amélioré le projet!