Mühelose Event -Stream -Integration für Ihre Dienste
Faststream vereinfacht den Prozess des Schreibens von Produzenten und Verbrauchern für Messing -Warteschlangen und behandelt die automatische Erzeugung von Parsing-, Networking- und Dokumentationsgenerierung.
Das Streaming -Microservices war noch nie einfacher. Faststream wurde mit Nachwuchsentwicklern entworfen und vereinfacht Ihre Arbeit und hält die Tür für fortschrittlichere Anwendungsfälle offen. Hier finden Sie einen Blick auf die Kernfunktionen, die FasteTstream zu einem Rahmen für moderne, datenzentrierte Microservices machen.
Mehrere Makler : Faststream bietet eine einheitliche API für mehrere Nachrichtenmakler ( Kafka , Rabbitmq , Nats , Redis -Unterstützung)
Pydantic Validierung : Nutzen Sie die Validierungsfunktionen von Pydantic, um eingehende Nachrichten zu serialisieren und zu validieren
Automatische Dokumente : Bleiben Sie mit der automatischen Asyncapi -Dokumentation voran
Intuitive : Support für Volltyp-Redakteur macht Ihre Entwicklungserfahrung reibungslos und fängt Fehler vor, bevor sie die Laufzeit erreichen
Leistungsstarkes Abhängigkeitsinjektionssystem : Verwalten Sie Ihre Serviceabhängigkeiten effizient mit dem integrierten DI-System von Faststream
Testbar : Unterstützt In-Memory-Tests und macht Ihre CI/CD-Pipeline schneller und zuverlässiger
Erweiterbar : Verwenden Sie Erweiterungen für Lebensdauer, benutzerdefinierte Serialisierung und Middleware
Integrationen : Faststream ist mit jedem gewünschten HTTP -Framework vollständig kompatibel (insbesondere Fastapi )
Das ist schneller Stream - einfach, effizient und leistungsstark. Egal, ob Sie gerade erst mit Streaming -Microservices beginnen oder skalieren möchten, Faststream hat Sie bedeckt.
Dokumentation : https://faststream.airt.ai/latest/
Faststream ist ein neues Paket, das auf den Ideen und Erfahrungen von Fastkafka und Propan basiert. Indem wir uns unseren Streitkräften anschließen, haben wir das Beste aus beiden Paketen aufgegriffen und eine einheitliche Möglichkeit erstellt, Dienste zu schreiben, die gestreamte Daten unabhängig vom zugrunde liegenden Protokoll verarbeiten können. Wir werden weiterhin beide Pakete unterhalten, aber neue Entwicklung wird in diesem Projekt sein. Wenn Sie einen neuen Service starten, ist dieses Paket die empfohlene Möglichkeit, dies zu tun.
Faststream arbeitet unter Linux- , MacOS- , Windows- und UNIX -Betriebssystemen. Sie können es wie gewohnt mit pip
installieren:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
Standardmäßig verwendet Faststream Pydanticv2 in Rost , aber Sie können es manuell herabstufen, wenn Ihre Plattform keine Rostunterstützung hat - Faststream funktioniert auch korrekt mit Pydanticv1 .
Faststream -Makler bieten bequeme Funktionsdekorateure @broker.subscriber
und @broker.publisher
, damit Sie den tatsächlichen Prozess delegieren können:
Daten konsumieren und produzieren Daten zu Ereigniswarteschlangen und
JSON-kodierte Nachrichten dekodieren und codieren
Diese Dekorateure machen es einfach, die Verarbeitungslogik für Ihre Verbraucher und Hersteller anzugeben, sodass Sie sich auf die Kerngeschäftslogik Ihrer Anwendung konzentrieren können, ohne sich um die zugrunde liegende Integration zu kümmern.
Außerdem verwendet Faststream Pydantic, um Eingabe-JSON-kodierte Daten in Python-Objekte zu analysieren, sodass Sie mit strukturierten Daten in Ihren Anwendungen einfach arbeiten können, sodass Sie Ihre Eingabemeldungen serialisieren können, nur mit Typ Anmerkungen.
Hier ist eine Beispielpython -App, die Faststream verwendet, die Daten aus einem eingehenden Datenstrom verbraucht und die Daten auf einen anderen ausgibt:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
Mit BaseModel
-Klasse von Pydantic können Sie auch Nachrichten mithilfe einer deklarativen Syntax definieren, um die Felder und Typen Ihrer Nachrichten einfach anzugeben.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
Der Dienst kann mit den TestBroker
-Kontextmanagern getestet werden, die den Broker standardmäßig in den "Testmodus" versetzt.
Der Tester leitet Ihren subscriber
und publisher
die Funktionen dekoriert haben, an die Inmemory -Broker um, sodass Sie Ihre App schnell testen können, ohne dass ein laufender Broker und alle Abhängigkeiten erforderlich sind.
Mit PyTest würde der Test für unseren Service so aussehen:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
Die Anwendung kann mit dem integrierten Faststream -CLI-Befehl gestartet werden.
Installieren Sie vor dem Ausführen des Dienstes Faststream CLI mit dem folgenden Befehl:
pip install " faststream[cli] "
Verwenden Sie zum Ausführen des Dienstes den Befehl Faststream CLI und übergeben Sie das Modul (in diesem Fall die Datei, in der sich die App -Implementierung befindet) und das App -Symbol an den Befehl.
faststream run basic:app
Nach dem Ausführen des Befehls sollten Sie die folgende Ausgabe sehen:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
Außerdem bietet Ihnen Faststream eine großartige Hot -Reload -Funktion, um Ihre Entwicklungserfahrung zu verbessern
faststream run basic:app --reload
Und multiprozessierende horizontale Skalierungsfunktion auch:
faststream run basic:app --workers 3
Sie können hier mehr über CLI -Funktionen erfahren
Faststream generiert automatisch Dokumentation für Ihr Projekt gemäß der Asyncapi -Spezifikation. Sie können mit beiden generierten Artefakten zusammenarbeiten und eine Webansicht Ihrer Dokumentation zu Ressourcen für verwandte Teams stellen.
Die Verfügbarkeit einer solchen Dokumentation vereinfacht die Integration von Diensten erheblich: Sie können sofort sehen, mit welchen Kanälen und Nachrichtenformaten die Anwendung arbeitet. Und vor allem wird es nichts kosten - Faststream hat die Dokumente bereits für Sie erstellt!
FastStream (dank FastDepends ) verfügt über ein Abhängigkeitsmanagementsystem ähnlich wie bei pytest fixtures
und FastAPI Depends
gleichzeitig ab. Funktionsargumente erklären, welche Abhängigkeiten Sie benötigen, und ein spezieller Dekorateur liefert sie aus dem globalen Kontextobjekt.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
Sie können Faststream MQBrokers
ohne FastStream
-Anwendung verwenden. Starten Sie einfach und stoppen Sie sie entsprechend der Lebensdauer Ihrer Bewerbung.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
Außerdem kann Faststream als Teil von Fastapi verwendet werden.
Importieren Sie einfach einen Streamrouter, den Sie benötigen, und deklarieren Sie den Nachrichtenhandler mit demselben @router.subscriber(...)
und @router.publisher(...)
Dekoratoren.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
Weitere Integrationsfunktionen finden Sie hier
Bitte zeigen Sie Ihre Unterstützung und bleiben Sie in Kontakt von:
Wir geben unserem Github -Repository einen Stern und
Beitritt zu unserem En -Discord -Server
Schließen Sie sich unserer Ru Telegram -Gruppe an
Ihre Unterstützung hilft uns, mit Ihnen in Kontakt zu bleiben und uns zu ermutigen, den Rahmen weiter zu entwickeln und zu verbessern. Vielen Dank für Ihre Unterstützung!
Vielen Dank an all diese tollen Leute, die das Projekt besser gemacht haben!