이 라이브러리는 더 이상 사용되지 않으며 더 이상 관리되거나 지원되지 않습니다. 현재 활성화된 커뮤니티 프로젝트는 https://github.com/faust-streaming/faust에서 확인할 수 있습니다.
버전: | 1.10.4 |
---|---|
편물: | http://faust.readthedocs.io/ |
다운로드: | http://pypi.org/project/faust |
원천: | http://github.com/robinhood/faust |
키워드: | 분산, 스트림, 비동기, 처리, 데이터, 대기열, 상태 관리 |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust는 Kafka Streams의 아이디어를 Python으로 포팅하는 스트림 처리 라이브러리입니다.
Robinhood에서는 매일 수십억 개의 이벤트를 처리하는 고성능 분산 시스템과 실시간 데이터 파이프라인을 구축하는 데 사용됩니다.
Faust는 스트림 처리 와 이벤트 처리를 모두 제공하며 Kafka Streams, Apache Spark/Storm/Samza/Flink와 같은 도구와 유사성을 공유합니다.
DSL을 사용하지 않고 단지 Python입니다! 즉, 스트림 처리 시 NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++ 등 즐겨 사용하는 Python 라이브러리를 모두 사용할 수 있습니다.
Faust에서는 새로운 async/await 구문과 변수 유형 주석을 위해 Python 3.6 이상이 필요합니다.
다음은 들어오는 주문 스트림을 처리하는 예입니다.
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
에이전트 데코레이터는 기본적으로 Kafka 주제에서 소비하고 수신되는 모든 이벤트에 대해 작업을 수행하는 "스트림 프로세서"를 정의합니다.
에이전트는 async def
기능이므로 웹 요청과 같은 다른 작업을 비동기적으로 수행할 수도 있습니다.
이 시스템은 데이터베이스처럼 작동하여 상태를 유지할 수 있습니다. 테이블은 일반 Python 사전으로 사용할 수 있는 분산 키/값 저장소로 명명됩니다.
테이블은 C++로 작성된 RocksDB라는 초고속 임베디드 데이터베이스를 사용하여 각 시스템에 로컬로 저장됩니다.
또한 테이블은 선택적으로 "기간 설정"된 집계 수를 저장할 수 있으므로 "마지막 날의 클릭 수" 또는 "지난 시간의 클릭 수"를 추적할 수 있습니다. 예를 들어. Kafka Streams와 마찬가지로 우리는 텀블링, 호핑 및 슬라이딩 시간 창을 지원하며 이전 창을 만료하여 데이터가 채워지는 것을 방지할 수 있습니다.
안정성을 위해 Kafka 주제를 "write-ahead-log"로 사용합니다. 키가 변경될 때마다 변경 로그에 게시됩니다. 대기 노드는 이 변경 로그를 사용하여 데이터의 정확한 복제본을 유지하고 노드 중 하나가 실패할 경우 즉시 복구를 가능하게 합니다.
사용자에게 테이블은 단지 사전이지만 데이터는 다시 시작하는 동안에도 유지되고 노드 간에 복제되므로 장애 조치 시 다른 노드가 자동으로 인계받을 수 있습니다.
URL별로 페이지 조회수를 계산할 수 있습니다.
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Kafka 주제로 전송된 데이터는 분할됩니다. 즉, 동일한 URL에 대한 모든 횟수가 동일한 Faust 작업자 인스턴스로 전달되는 방식으로 클릭이 URL별로 분할됩니다.
Faust는 바이트, 유니코드, 직렬화된 구조 등 모든 유형의 스트림 데이터를 지원하지만 최신 Python 구문을 사용하여 스트림의 키와 값이 직렬화되는 방식을 설명하는 "모델"도 함께 제공됩니다.
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust는 mypy
유형 검사기를 사용하여 정적으로 유형이 지정되므로 애플리케이션을 작성할 때 정적 유형을 활용할 수 있습니다.
Faust 소스 코드는 작고 잘 구성되어 있으며 Kafka Streams 구현을 학습하는 데 좋은 리소스 역할을 합니다.
파우스트는 사용하기 매우 쉽습니다. 다른 스트림 처리 솔루션을 사용하려면 복잡한 hello-world 프로젝트와 인프라 요구 사항이 있어야 합니다. Faust에는 Kafka만 필요하고 나머지는 Python만 필요하므로 Python을 알고 있다면 이미 Faust를 사용하여 스트림 처리를 수행할 수 있으며 거의 모든 것과 통합할 수 있습니다.
다음은 여러분이 만들 수 있는 더 쉬운 응용 프로그램 중 하나입니다.
수입 파우스트 클래스 인사말(faust.Record): from_name: 문자열 to_name: 문자열 앱 = faust.App('hello-app',broker='kafka://localhost') 주제 = app.topic('hello-topic', value_type=인사말) @app.agent(주제) 비동기 def 안녕하세요(인사말): 인사말의 인사말에 대한 비동기: print(f'{greeting.from_name}님이 {greeting.to_name}님에게 안녕하세요') @app.timer(간격=1.0) 비동기 def example_sender(app): 안녕하세요.보내기( value=인사말(from_name='파우스트', to_name='당신'), ) __name__ == '__main__'인 경우: app.main()
async 및 wait 키워드에 약간 겁이 날 수도 있지만 Faust를 사용하기 위해 asyncio
어떻게 작동하는지 알 필요는 없습니다. 예제를 흉내만 내면 괜찮을 것입니다.
예제 애플리케이션은 두 가지 작업을 시작합니다. 하나는 스트림을 처리하는 것이고, 다른 하나는 이벤트를 해당 스트림으로 보내는 백그라운드 스레드입니다. 실제 애플리케이션에서 시스템은 프로세서가 사용할 수 있는 Kafka 주제에 이벤트를 게시하며, 백그라운드 스레드는 예제에 데이터를 공급하는 데만 필요합니다.
Python Package Index(PyPI)를 통해 또는 소스에서 Faust를 설치할 수 있습니다.
pip를 사용하여 설치하려면:
$ pip install -U faust
Faust는 또한 Faust와 특정 기능에 대한 종속성을 설치하는 데 사용할 수 있는 setuptools
확장 그룹을 정의합니다.
요구 사항이나 pip
명령줄에서 대괄호를 사용하여 이를 지정할 수 있습니다. 여러 번들은 쉼표를 사용하여 구분합니다.
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
다음 번들이 사용 가능합니다:
faust[rocksdb] : | Faust 테이블 상태를 저장하기 위해 RocksDB를 사용합니다. 생산에 권장됩니다. |
---|
faust[redis] : | Redis_를 간단한 캐싱 백엔드로 사용하기 위한 것입니다(Memcached 스타일). |
---|
faust[yaml] : | 스트림에서 YAML 및 PyYAML 라이브러리를 사용합니다. |
---|
faust[fast] : | 사용 가능한 모든 C 속도 향상 확장을 Faust 코어에 설치합니다. |
---|
faust[datadog] : | Datadog Faust 모니터를 사용하기 위해. |
---|---|
faust[statsd] : | Statsd Faust 모니터를 사용하기 위한 것입니다. |
faust[uvloop] : | uvloop 와 함께 Faust를 사용하는 경우. |
---|---|
faust[eventlet] : | eventlet 과 함께 파우스트를 사용하기 위해 |
faust[debug] : | aiomonitor 사용하여 실행 중인 Faust 작업자를 연결하고 디버깅합니다. |
---|---|
faust[setproctitle] : | setproctitle 모듈이 설치되면 Faust 작업자는 이를 사용하여 ps / top 목록에 더 좋은 프로세스 이름을 설정합니다. 또한 fast 및 debug 번들과 함께 설치됩니다. |
http://pypi.org/project/faust에서 최신 버전의 Faust를 다운로드하세요.
다음을 수행하여 설치할 수 있습니다.
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
현재 virtualenv를 사용하지 않는 경우 마지막 명령은 권한이 있는 사용자로 실행되어야 합니다.
다음 pip
명령을 사용하여 Faust의 최신 스냅샷을 설치할 수 있습니다.
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
예! asyncio
와 통합하려면 eventlet
브리지로 사용하세요.
eventlet
사용 이 접근 방식은 eventlet
과 함께 작동할 수 있는 모든 차단 Python 라이브러리에서 작동합니다.
eventlet
사용하려면 aioeventlet
모듈을 설치해야 하며 이를 Faust와 함께 번들로 설치할 수 있습니다.
$ pip install -U faust[eventlet]
그런 다음 실제로 이벤트 루프를 이벤트 루프로 사용하려면 faust
프로그램에 -L <faust --loop>
인수를 사용해야 합니다.
$ faust -L eventlet -A myproj worker -l info
또는 진입점 스크립트 상단에 import mode.loop.eventlet
추가하세요.
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
경고
이것이 모듈의 맨 위에 있고 라이브러리를 가져오기 전에 실행된다는 것이 매우 중요합니다.
예! tornado.platform.asyncio
브리지를 사용하세요: http://www.tornadoweb.org/en/stable/asyncio.html
예! asyncio
Reactor 구현 사용: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
아니요. Faust에는 Python 3.6에 도입된 기능(비동기, 대기, 변수 유형 주석)을 많이 사용하므로 Python 3.6 이상이 필요합니다.
최대 열린 파일 수에 대한 제한을 늘려야 할 수도 있습니다. 다음 게시물에서는 OS X에서 이를 수행하는 방법을 설명합니다: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust는 버전이 0.10 이상인 kafka를 지원합니다.
Faust의 활용, 개발, 미래에 대한 토론을 원하시면 `fauststream`_ Slack에 참여해주세요.
제안 사항, 버그 보고서 또는 불편한 사항이 있는 경우 https://github.com/robinhood/faust/issues/의 문제 추적기에 보고해 주세요.
이 소프트웨어는 새로운 BSD 라이선스에 따라 라이선스가 부여됩니다. 전체 라이센스 텍스트는 최상위 배포 디렉토리에 있는 LICENSE
파일을 참조하세요.
Faust 개발은 GitHub에서 이루어집니다: https://github.com/robinhood/faust
Faust 개발에 참여해 보시기 바랍니다.
문서의 Contributing to Faust 섹션도 읽어보세요.
프로젝트의 코드 베이스, 이슈 트래커, 채팅방, 메일링 리스트에서 상호 작용하는 모든 사람은 파우스트 행동 강령을 따라야 합니다.
이러한 프로젝트의 기여자이자 유지관리자로서 개방적이고 환영받는 커뮤니티를 조성하기 위해 우리는 문제 보고, 기능 요청 게시, 문서 업데이트, 풀 요청 또는 패치 제출 및 기타 활동을 통해 기여하는 모든 사람을 존중할 것을 약속합니다.
우리는 이러한 프로젝트에 경험 수준, 성별, 성 정체성 및 표현, 성적 취향, 장애, 외모, 신체 크기, 인종, 민족, 나이, 종교 등에 관계없이 모든 사람이 괴롭힘 없는 경험을 할 수 있도록 최선을 다하고 있습니다. 국적.
참가자가 용납할 수 없는 행동의 예는 다음과 같습니다.
프로젝트 관리자는 본 행동 강령에 부합하지 않는 댓글, 커밋, 코드, Wiki 편집, 문제 및 기타 기여를 제거, 편집 또는 거부할 권리와 책임이 있습니다. 이 행동 강령을 채택함으로써 프로젝트 유지관리자는 이 원칙을 이 프로젝트 관리의 모든 측면에 공정하고 일관되게 적용할 것을 약속합니다. 행동 강령을 따르거나 집행하지 않는 프로젝트 유지관리자는 프로젝트 팀에서 영구적으로 제외될 수 있습니다.
이 행동 강령은 개인이 프로젝트나 해당 커뮤니티를 대표할 때 프로젝트 공간과 공공 장소 모두에 적용됩니다.
욕설, 괴롭힘 또는 기타 용납할 수 없는 행동의 사례는 문제를 공개하거나 프로젝트 관리자 중 한 명 이상에게 연락하여 보고할 수 있습니다.
본 행동 강령은 http://contributor-covenant.org/version/1/2/0/에서 제공되는 기여자 규약 버전 1.2.0에서 수정되었습니다.