서비스를위한 손쉬운 이벤트 스트림 통합
Faststream은 메시지 대기열에 대한 생산자 및 소비자를 작성하는 프로세스를 단순화하고 모든 구문 분석, 네트워킹 및 문서 생성을 자동으로 처리합니다.
스트리밍 마이크로 서비스를 만드는 것은 결코 쉬운 일이 아닙니다. 주니어 개발자를 염두에두고 설계된 Faststream은 보다 고급 사용 사례를 위해 문을 열어 두면서 작업을 단순화합니다. 다음은 FastStream을 현대적인 데이터 중심 마이크로 서비스를위한 프레임 워크로 만드는 핵심 기능을 살펴보십시오.
다중 중개인 : Faststream은 여러 메시지 중개인 ( Kafka , Rabbitmq , Nats , Redis Support)에서 작동하는 통합 API를 제공합니다.
Pydantic Validation : Pydantic의 검증 기능을 활용하여 들어오는 메시지를 직렬화하고 검증합니다.
자동 문서 : 자동 AsynCapi 문서를 통해 앞서 나가십시오
직관적 : 완전한 편집자 지원은 개발 경험을 매끄럽게 만듭니다.
강력한 종속성 주입 시스템 : Faststream 의 내장 DI 시스템을 통해 서비스 종속성을 효율적으로 관리합니다.
테스트 가능 : 메모리 내 테스트를 지원하여 CI/CD 파이프 라인을보다 빠르고 신뢰할 수 있도록합니다.
확장 가능 : 수명, 사용자 정의 직렬화 및 미들웨어에 대한 확장 사용
통합 : FastStream은 원하는 HTTP 프레임 워크와 완전히 호환됩니다 (특히 FastApi )
그것은 간단하고 효율적이며 강력한 간단히 말해서 빠른 스트림 입니다. 스트리밍 마이크로 서비스로 시작하든 스케일을 찾고 있든 FastStream이 당신을 덮었습니다.
문서 : https://faststream.airt.ai/latest/
Faststream 은 Fastkafka 와 Propan 에서 얻은 아이디어와 경험을 기반으로 한 새로운 패키지입니다. 우리는 힘에 합류하여 두 패키지 모두에서 최고를 선택하고 기본 프로토콜에 관계없이 스트리밍 된 데이터를 처리 할 수있는 서비스를 작성하는 통합 방법을 만들었습니다. 우리는 두 패키지를 계속 유지하지만이 프로젝트에는 새로운 개발이있을 것입니다. 새 서비스를 시작하는 경우이 패키지가 권장되는 방법입니다.
Faststream은 Linux , MacOS , Windows 및 대부분의 Unix 스타일 운영 체제에서 작동합니다. 평소와 같이 pip
로 설치할 수 있습니다.
pip install faststream[kafka]
# or
pip install faststream[rabbit]
# or
pip install faststream[nats]
# or
pip install faststream[redis]
기본적으로 Faststream을 사용하여 Rust 로 작성된 pydanticv2를 사용하지만 플랫폼에 Rust 지원이없는 경우 수동으로 다운 그레이드 할 수 있습니다. FastStream은 PydanticV1 에서도 올바르게 작동합니다.
FastStream Brokers는 편리한 기능 Decorators @broker.subscriber
및 @broker.publisher
제공하여 실제 프로세스를 위임 할 수 있습니다.
이벤트 대기열에 데이터를 소비하고 생성합니다
JSON 인코딩 된 메시지 디코딩 및 인코딩
이 데코레이터를 사용하면 소비자 및 생산자를위한 처리 로직을 쉽게 지정할 수 있으므로 기본 통합에 대해 걱정하지 않고 응용 프로그램의 핵심 비즈니스 논리에 집중할 수 있습니다.
또한 FastStream은 Pydantic을 사용하여 JSON에 인코딩 된 데이터를 Python 개체에 구문 분석하여 응용 프로그램에서 구조화 된 데이터로 쉽게 작업 할 수 있으므로 유형 주석을 사용하여 입력 메시지를 직렬화 할 수 있습니다.
다음은 들어오는 데이터 스트림의 데이터를 소비하고 데이터를 다른 방법으로 출력하는 FastStream을 사용하는 Python App의 예입니다.
from faststream import FastStream
from faststream . kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
broker = KafkaBroker ( "localhost:9092" )
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
app = FastStream ( broker )
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( user : str , user_id : int ) -> str :
return f"User: { user_id } - { user } registered"
또한 Pydantic 의 BaseModel
클래스를 사용하면 선언문 구문을 사용하여 메시지를 정의 할 수 있으므로 메시지의 필드 및 유형을 쉽게 지정할 수 있습니다.
from pydantic import BaseModel , Field , PositiveInt
from faststream import FastStream
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
class User ( BaseModel ):
user : str = Field (..., examples = [ "John" ])
user_id : PositiveInt = Field (..., examples = [ "1" ])
@ broker . subscriber ( "in" )
@ broker . publisher ( "out" )
async def handle_msg ( data : User ) -> str :
return f"User: { data . user } - { data . user_id } registered"
이 서비스는 기본적으로 브로커를 "테스트 모드"로 만드는 TestBroker
Context Manager를 사용하여 테스트 할 수 있습니다.
테스터는 subscriber
와 publisher
장식 기능을 Inmemory 브로커로 리디렉션하여 실행중인 브로커 및 모든 종속성없이 앱을 신속하게 테스트 할 수 있습니다.
Pytest를 사용하면 서비스 테스트가 다음과 같습니다.
# Code above omitted ?
import pytest
import pydantic
from faststream . kafka import TestKafkaBroker
@ pytest . mark . asyncio
async def test_correct ():
async with TestKafkaBroker ( broker ) as br :
await br . publish ({
"user" : "John" ,
"user_id" : 1 ,
}, "in" )
@ pytest . mark . asyncio
async def test_invalid ():
async with TestKafkaBroker ( broker ) as br :
with pytest . raises ( pydantic . ValidationError ):
await br . publish ( "wrong message" , "in" )
내장 된 FastStream CLI 명령을 사용하여 응용 프로그램을 시작할 수 있습니다.
서비스를 실행하기 전에 다음 명령을 사용하여 FastStream CLI를 설치하십시오.
pip install " faststream[cli] "
서비스를 실행하려면 FastStream CLI 명령을 사용하고 모듈 (이 경우 앱 구현이있는 파일)과 앱 기호를 명령에 전달하십시오.
faststream run basic:app
명령을 실행하면 다음 출력이 표시됩니다.
INFO - FastStream app starting...
INFO - input_data | - ` HandleMsg ` waiting for messages
INFO - FastStream app started successfully ! To exit press CTRL+C
또한 Faststream은 개발 경험을 향상시키기위한 훌륭한 핫 재 장전 기능을 제공합니다.
faststream run basic:app --reload
다중 프로세싱 수평 스케일링 기능도 다음과 같습니다.
faststream run basic:app --workers 3
CLI 기능에 대한 자세한 내용은 여기를 참조하십시오
FastStream은 Asyncapi 사양에 따라 프로젝트에 대한 문서를 자동으로 생성합니다. 생성 된 아티팩트와 함께 작업하고 관련 팀이 이용할 수있는 리소스에 대한 문서의 웹보기를 배치 할 수 있습니다.
이러한 문서의 가용성은 서비스의 통합을 크게 단순화합니다. 응용 프로그램과 함께 작동하는 채널과 메시지 형식을 즉시 확인할 수 있습니다. 그리고 가장 중요한 것은 비용이 많이 들지 않을 것입니다. Faststream은 이미 당신을 위해 문서를 만들었습니다!
FastStream ( FastDepends 덕분에)에는 pytest fixtures
과 유사한 종속성 관리 시스템이 있으며 FastAPI Depends
. 함수 인수는 원하는 의존성이 필요한 의존성을 선언하고 특수 데코레이터는 글로벌 컨텍스트 객체에서이를 전달합니다.
from faststream import Depends , Logger
async def base_dep ( user_id : int ) -> bool :
return True
@ broker . subscriber ( "in-test" )
async def base_handler ( user : str ,
logger : Logger ,
dep : bool = Depends ( base_dep )):
assert dep is True
logger . info ( user )
FastStream
응용 프로그램없이 FastStream MQBrokers
사용할 수 있습니다. 응용 프로그램의 수명에 따라 시작 하고 중지하십시오 .
from aiohttp import web
from faststream . kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
@ broker . subscriber ( "test" )
async def base_handler ( body ):
print ( body )
async def start_broker ( app ):
await broker . start ()
async def stop_broker ( app ):
await broker . close ()
async def hello ( request ):
return web . Response ( text = "Hello, world" )
app = web . Application ()
app . add_routes ([ web . get ( "/" , hello )])
app . on_startup . append ( start_broker )
app . on_cleanup . append ( stop_broker )
if __name__ == "__main__" :
web . run_app ( app )
또한 Faststream은 Fastapi 의 일부로 사용할 수 있습니다.
필요한 스트림 루터를 가져 와서 동일한 @router.subscriber(...)
및 @router.publisher(...)
데코레이터로 메시지 핸들러를 선언합니다.
from fastapi import FastAPI
from pydantic import BaseModel
from faststream . kafka . fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
class Incoming ( BaseModel ):
m : dict
@ router . subscriber ( "test" )
@ router . publisher ( "response" )
async def hello ( m : Incoming ):
return { "response" : "Hello, world!" }
app = FastAPI ()
app . include_router ( router )
더 많은 통합 기능이 여기에서 찾을 수 있습니다
지원을 보여주고 연락을 유지하십시오.
우리의 github 리포지토리를 별에 제공하고
EN Discord 서버 가입
RU Telegram 그룹에 가입
귀하의 지원은 우리가 귀하와 연락을 유지하고 프레임 워크를 계속 개발하고 개선하도록 권장합니다. 지원해 주셔서 감사합니다!
프로젝트를 개선 한이 놀라운 사람들 덕분에!