Integrasi aliran acara yang mudah untuk layanan Anda
Faststream menyederhanakan proses penulisan produsen dan konsumen untuk antrian pesan, menangani semua pembuatan parsing, jaringan, dan dokumentasi secara otomatis.
Membuat streaming layanan microser tidak pernah semudah ini. Dirancang dengan mempertimbangkan pengembang junior, Faststream menyederhanakan pekerjaan Anda sambil menjaga pintu tetap terbuka untuk kasus penggunaan yang lebih lanjut. Berikut ini adalah fitur inti yang menjadikan Faststream sebagai kerangka kerja untuk microservices modern yang berpusat pada data.
Beberapa broker : Faststream menyediakan API terpadu untuk bekerja di berbagai broker pesan ( Kafka , Rabbitmq , NATS , Dukungan Redis )
Validasi Pydantic : Leverage kemampuan validasi Pydantic untuk membuat serialisasi dan memvalidasi pesan yang masuk
Dokumen Otomatis : Tetap Depan dengan Dokumentasi Asyncapi Otomatis
Intuitif : Dukungan editor yang diketik penuh membuat pengalaman pengembangan Anda lancar dan menangkap kesalahan sebelum mereka mencapai runtime
Sistem Injeksi Ketergantungan yang Kuat : Kelola dependensi layanan Anda secara efisien dengan sistem DI bawaan Faststream
Testable : Mendukung tes dalam memori, membuat pipa CI/CD Anda lebih cepat dan lebih andal
Extensible : Gunakan ekstensi untuk rentang hidup, serialisasi khusus dan middleware
Integrasi : Faststream sepenuhnya kompatibel dengan kerangka HTTP apa pun yang Anda inginkan (terutama Fastapi )
Singkatnya, faststream itu - mudah, efisien, dan kuat. Apakah Anda baru memulai dengan streaming layanan microser atau ingin skala, Faststream telah membuat Anda tertutup.
Dokumentasi : https://faststream.airt.ai/latest/
Faststream adalah paket baru berdasarkan ide dan pengalaman yang diperoleh dari Fastkafka dan Propane . Dengan bergabung dengan pasukan kami, kami mengambil yang terbaik dari kedua paket dan menciptakan cara terpadu untuk menulis layanan yang mampu memproses data streaming terlepas dari protokol yang mendasarinya. Kami akan terus mempertahankan kedua paket, tetapi pengembangan baru akan ada di proyek ini. Jika Anda memulai layanan baru, paket ini adalah cara yang disarankan untuk melakukannya.
Faststream bekerja di Linux , MacOS , Windows , dan sistem operasi gaya paling unix . Anda dapat menginstalnya dengan pip
seperti biasa:
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
Secara default FastStream menggunakan PyDanticV2 yang ditulis dengan karat , tetapi Anda dapat menurunkannya secara manual, jika platform Anda tidak memiliki dukungan karat - Faststream akan bekerja dengan benar dengan PydanticV1 juga.
Faststream Brokers menyediakan dekorator fungsi yang nyaman @broker.subscriber
dan @broker.publisher
untuk memungkinkan Anda mendelegasikan proses aktual:
mengkonsumsi dan memproduksi data ke antrian acara, dan
decoding dan pengkodean pesan yang dikodekan JSON
Dekorator ini memudahkan untuk menentukan logika pemrosesan untuk konsumen dan produsen Anda, yang memungkinkan Anda untuk fokus pada logika bisnis inti aplikasi Anda tanpa khawatir tentang integrasi yang mendasarinya.
Juga, Faststream menggunakan Pydantic untuk menguraikan input data yang dikodekan JSON ke dalam objek Python, membuatnya mudah untuk bekerja dengan data terstruktur dalam aplikasi Anda, sehingga Anda dapat membuat serial pesan input Anda hanya menggunakan anotasi tipe.
Berikut adalah contoh aplikasi Python menggunakan Faststream yang mengkonsumsi data dari aliran data yang masuk dan mengeluarkan data ke yang lain:
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
Juga, kelas BaseModel
Pydantic memungkinkan Anda untuk mendefinisikan pesan menggunakan sintaks deklaratif, membuatnya mudah untuk menentukan bidang dan jenis pesan Anda.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
Layanan ini dapat diuji menggunakan manajer konteks TestBroker
, yang, secara default, menempatkan broker ke dalam "mode pengujian".
Penguji akan mengarahkan kembali fungsi subscriber
dan publisher
Anda yang didekorasi ke broker inmemory, memungkinkan Anda untuk dengan cepat menguji aplikasi Anda tanpa perlu broker yang sedang berjalan dan semua dependensinya.
Menggunakan Pytest, tes untuk layanan kami akan terlihat seperti ini:
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
Aplikasi ini dapat mulai menggunakan perintah Faststream CLI bawaan.
Sebelum menjalankan layanan, instal Faststream CLI menggunakan perintah berikut:
pip install " faststream[cli] "
Untuk menjalankan Layanan, gunakan perintah Faststream CLI dan lulus modul (dalam hal ini, file tempat implementasi aplikasi berada) dan simbol aplikasi ke perintah.
faststream run basic:app
Setelah menjalankan perintah, Anda akan melihat output berikut:
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
Juga, Faststream memberi Anda fitur pemuatan ulang panas yang hebat untuk meningkatkan pengalaman pengembangan Anda
faststream run basic:app --reload
Dan fitur penskalaan horizontal multiprosessing juga:
faststream run basic:app --workers 3
Anda dapat mempelajari lebih lanjut tentang fitur CLI di sini
Faststream secara otomatis menghasilkan dokumentasi untuk proyek Anda sesuai dengan spesifikasi ASYNCAPI . Anda dapat bekerja dengan artefak yang dihasilkan dan menempatkan tampilan web dokumentasi Anda tentang sumber daya yang tersedia untuk tim terkait.
Ketersediaan dokumentasi tersebut secara signifikan menyederhanakan integrasi layanan: Anda dapat segera melihat saluran dan format pesan apa yang dilakukan aplikasi. Dan yang paling penting, itu tidak akan dikenakan biaya apa pun - Faststream telah menciptakan dokumen untuk Anda!
Faststream (terima kasih kepada FastDepends ) memiliki sistem manajemen ketergantungan yang mirip dengan pytest fixtures
dan FastAPI Depends
pada saat yang sama. Argumen fungsi menyatakan dependensi mana yang Anda inginkan diperlukan, dan dekorator khusus mengirimkannya dari objek konteks global.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
Anda dapat menggunakan faststream MQBrokers
tanpa aplikasi FastStream
. Mulailah dan hentikan mereka sesuai dengan umur aplikasi Anda.
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
Juga, Faststream dapat digunakan sebagai bagian dari Fastapi .
Cukup impor streamRouter yang Anda butuhkan dan nyatakan penangan pesan dengan @router.subscriber(...)
dan @router.publisher(...)
dekorator.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
Lebih banyak fitur integrasi dapat ditemukan di sini
Tolong tunjukkan dukungan Anda dan tetap berhubungan dengan:
Memberi repositori github kami bintang, dan
Bergabung dengan End Server kami
Bergabung dengan Grup Telegram RU kami
Dukungan Anda membantu kami untuk tetap berhubungan dengan Anda dan mendorong kami untuk terus mengembangkan dan meningkatkan kerangka kerja. Terima kasih atas dukungan Anda!
Terima kasih untuk semua orang luar biasa yang membuat proyek ini lebih baik!