ไลบรารีนี้เลิกใช้แล้วและไม่ได้รับการจัดการหรือรองรับอีกต่อไป โครงการชุมชนที่ใช้งานในปัจจุบันสามารถพบได้ที่ https://github.com/faust-streaming/faust
เวอร์ชัน: | 1.10.4 |
---|---|
เว็บ: | http://faust.readthedocs.io/ |
ดาวน์โหลด: | http://pypi.org/project/faust |
แหล่งที่มา: | http://github.com/robinhood/faust |
คำสำคัญ: | แบบกระจาย สตรีม อะซิงก์ การประมวลผล ข้อมูล คิว การจัดการสถานะ |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust เป็นไลบรารีการประมวลผลสตรีมที่ย้ายแนวคิดจาก Kafka Streams ไปยัง Python
มันถูกใช้ที่ Robinhood เพื่อสร้างระบบกระจายประสิทธิภาพสูงและไปป์ไลน์ข้อมูลแบบเรียลไทม์ที่ประมวลผลเหตุการณ์นับพันล้านรายการทุกวัน
Faust ให้ทั้ง การประมวลผลสตรีม และ การประมวลผลเหตุการณ์ แบ่งปันความคล้ายคลึงกันกับเครื่องมือต่างๆ เช่น Kafka Streams, Apache Spark/Storm/Samza/Flink,
มันไม่ได้ใช้ DSL มันเป็นแค่ Python! ซึ่งหมายความว่าคุณสามารถใช้ไลบรารี Python ที่คุณชื่นชอบทั้งหมดได้เมื่อประมวลผลสตรีม: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust ต้องการ Python 3.6 หรือใหม่กว่าสำหรับไวยากรณ์ async/await ใหม่และคำอธิบายประกอบประเภทตัวแปร
นี่คือตัวอย่างการประมวลผลกระแสคำสั่งซื้อที่เข้ามา:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
เครื่องมือตกแต่งตัวแทนกำหนด "ตัวประมวลผลสตรีม" ที่บริโภคจากหัวข้อคาฟคาเป็นหลักและดำเนินการบางอย่างกับทุกเหตุการณ์ที่ได้รับ
เอเจนต์เป็นฟังก์ชัน async def
ดังนั้นจึงสามารถทำการดำเนินการอื่นๆ แบบอะซิงโครนัสได้ เช่น คำขอเว็บ
ระบบนี้สามารถคงสถานะไว้ได้ โดยทำหน้าที่เหมือนกับฐานข้อมูล ตารางมีชื่อว่าที่เก็บคีย์/ค่าแบบกระจายที่คุณสามารถใช้เป็นพจนานุกรม Python ทั่วไปได้
ตารางจะถูกจัดเก็บไว้ในเครื่องแต่ละเครื่องโดยใช้ฐานข้อมูลแบบฝังที่รวดเร็วเป็นพิเศษซึ่งเขียนด้วยภาษา C ++ ที่เรียกว่า RocksDB
ตารางยังสามารถจัดเก็บจำนวนรวมที่เป็นทางเลือก "แบบหน้าต่าง" เพื่อให้คุณสามารถติดตาม "จำนวนคลิกจากวันสุดท้าย" หรือ "จำนวนคลิกในชั่วโมงที่ผ่านมา" ตัวอย่างเช่น. เช่นเดียวกับ Kafka Streams เรารองรับการพลิกผัน การกระโดด และการเลื่อนหน้าต่างของเวลา และหน้าต่างเก่าสามารถหมดอายุได้เพื่อหยุดข้อมูลไม่ให้เต็ม
เพื่อความน่าเชื่อถือ เราใช้หัวข้อ Kafka เป็น "บันทึกล่วงหน้า" เมื่อใดก็ตามที่มีการเปลี่ยนแปลงคีย์ เราจะเผยแพร่ไปยังบันทึกการเปลี่ยนแปลง โหนดสแตนด์บายใช้จากบันทึกการเปลี่ยนแปลงนี้เพื่อเก็บแบบจำลองข้อมูลที่แน่นอน และเปิดใช้งานการกู้คืนได้ทันทีหากโหนดใดโหนดหนึ่งล้มเหลว
สำหรับผู้ใช้ ตารางเป็นเพียงพจนานุกรม แต่ข้อมูลจะยังคงอยู่ระหว่างการรีสตาร์ทและจำลองแบบข้ามโหนด ดังนั้นเมื่อเกิดข้อผิดพลาด โหนดอื่นๆ จึงสามารถเข้าควบคุมได้โดยอัตโนมัติ
คุณสามารถนับการดูหน้าเว็บตาม URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
ข้อมูลที่ส่งไปยังหัวข้อ Kafka จะถูกแบ่งพาร์ติชัน ซึ่งหมายความว่าการคลิกจะถูกแบ่งส่วนโดย URL ในลักษณะที่ทุกการนับสำหรับ URL เดียวกันจะถูกส่งไปยังอินสแตนซ์ของพนักงาน Faust เดียวกัน
Faust รองรับข้อมูลสตรีมทุกประเภท: ไบต์, Unicode และโครงสร้างซีเรียลไลซ์ แต่ยังมาพร้อมกับ "โมเดล" ที่ใช้ไวยากรณ์ Python สมัยใหม่เพื่ออธิบายวิธีการซีเรียลไลซ์คีย์และค่าในสตรีม:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust มีการพิมพ์แบบคงที่โดยใช้ตัวตรวจสอบประเภท mypy
ดังนั้นคุณจึงสามารถใช้ประโยชน์จากประเภทคงที่เมื่อเขียนแอปพลิเคชัน
ซอร์สโค้ด Faust มีขนาดเล็ก มีการจัดระเบียบอย่างดี และทำหน้าที่เป็นแหล่งข้อมูลที่ดีสำหรับการเรียนรู้การใช้งาน Kafka Streams
เฟาสต์ใช้งานง่ายมาก ในการเริ่มต้นใช้โซลูชันการประมวลผลสตรีมอื่นๆ คุณต้องมีโปรเจ็กต์ Hello-World ที่ซับซ้อน และข้อกำหนดด้านโครงสร้างพื้นฐาน Faust ต้องการเพียง Kafka ส่วนที่เหลือเป็นเพียง Python ดังนั้นหากคุณรู้จัก Python คุณสามารถใช้ Faust เพื่อประมวลผลสตรีมได้แล้ว และมันสามารถรวมเข้ากับอะไรก็ได้
นี่คือหนึ่งในแอปพลิเคชันที่ง่ายกว่าที่คุณสามารถทำได้:
เฟาสต์นำเข้า คำทักทายในชั้นเรียน (faust.Record): from_name: str to_name: str app = faust.App('hello-app', โบรกเกอร์='kafka://localhost') topic = app.topic('hello-topic', value_type=คำทักทาย) @app.agent(หัวข้อ) async def สวัสดี (ทักทาย): async สำหรับการทักทายในการทักทาย: พิมพ์ (f'สวัสดีจาก {greeting.from_name} ถึง {greeting.to_name}') @app.timer(ช่วงเวลา=1.0) async def example_sender (แอป): รอสวัสดีส่ง ( value=ทักทาย(from_name='Faust', to_name='you'), - ถ้า __name__ == '__main__': แอพ. main()
คุณอาจรู้สึกหวาดกลัวเล็กน้อยกับ async และ await คีย์เวิร์ด แต่คุณไม่จำเป็นต้องรู้ว่า asyncio
ทำงานอย่างไรในการใช้ Faust เพียงเลียนแบบตัวอย่างแล้วคุณจะสบายดี
แอปพลิเคชันตัวอย่างเริ่มต้นงานสองงาน งานแรกกำลังประมวลผลสตรีม งานที่สองคือเธรดพื้นหลังที่ส่งเหตุการณ์ไปยังสตรีมนั้น ในแอปพลิเคชันในชีวิตจริง ระบบของคุณจะเผยแพร่กิจกรรมไปยังหัวข้อ Kafka ที่โปรเซสเซอร์ของคุณสามารถใช้ได้ และเธรดพื้นหลังจำเป็นสำหรับการป้อนข้อมูลลงในตัวอย่างของเราเท่านั้น
คุณสามารถติดตั้ง Faust ผ่าน Python Package Index (PyPI) หรือจากแหล่งที่มา
วิธีติดตั้งโดยใช้ pip:
$ pip install -U faust
เฟาสต์ยังกำหนดกลุ่มของส่วนขยาย setuptools
ที่สามารถใช้เพื่อติดตั้งเฟาสท์และการขึ้นต่อกันสำหรับคุณลักษณะที่กำหนด
คุณสามารถระบุสิ่งเหล่านี้ในความต้องการของคุณหรือในบรรทัดคำสั่ง pip
ได้โดยใช้วงเล็บเหลี่ยม แยกหลายกลุ่มโดยใช้เครื่องหมายจุลภาค:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
มีบันเดิลดังต่อไปนี้:
faust[rocksdb] : | สำหรับการใช้ RocksDB เพื่อจัดเก็บสถานะตาราง Faust แนะนำในการผลิต |
---|
faust[redis] : | สำหรับการใช้ Redis_ เป็นแบ็กเอนด์แคชแบบง่าย (สไตล์ Memcached) |
---|
faust[yaml] : | สำหรับการใช้ YAML และไลบรารี PyYAML ในสตรีม |
---|
faust[fast] : | สำหรับการติดตั้งส่วนขยายการเร่งความเร็ว C ทั้งหมดที่มีอยู่ไปยัง Faust core |
---|
faust[datadog] : | สำหรับการใช้มอนิเตอร์ Datadog Faust |
---|---|
faust[statsd] : | สำหรับการใช้จอภาพ Statsd Faust |
faust[uvloop] : | สำหรับการใช้ Faust กับ uvloop |
---|---|
faust[eventlet] : | สำหรับการใช้ Faust กับ eventlet |
faust[debug] : | สำหรับการใช้ aiomonitor เพื่อเชื่อมต่อและดีบักผู้ปฏิบัติงาน Faust ที่ทำงานอยู่ |
---|---|
faust[setproctitle] : | เมื่อติดตั้งโมดูล setproctitle แล้ว พนักงาน Faust จะใช้มันเพื่อตั้งชื่อกระบวนการที่ดีกว่าในรายการ ps / top ติดตั้งด้วยชุดรวม fast และ debug |
ดาวน์โหลด Faust เวอร์ชันล่าสุดได้จากhttp://pypi.org/project/faust
คุณสามารถติดตั้งได้โดยทำดังนี้
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
คำสั่งสุดท้ายจะต้องดำเนินการในฐานะผู้ใช้ที่มีสิทธิ์หากคุณไม่ได้ใช้ virtualenv ในปัจจุบัน
คุณสามารถติดตั้งสแน็ปช็อตล่าสุดของ Faust ได้โดยใช้คำสั่ง pip
ต่อไปนี้:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
ใช่! ใช้ eventlet
เป็นสะพานเพื่อรวมเข้ากับ asyncio
eventlet
วิธีการนี้ใช้ได้กับไลบรารี Python ที่บล็อกใด ๆ ที่สามารถทำงานกับ eventlet
ได้
การใช้ eventlet
ต้องการให้คุณติดตั้งโมดูล aioeventlet
และคุณสามารถติดตั้งเป็นบันเดิลพร้อมกับ Faust:
$ pip install -U faust[eventlet]
จากนั้นหากต้องการใช้ eventlet เป็น event loop คุณต้องใช้อาร์กิวเมนต์ -L <faust --loop>
กับโปรแกรม faust
:
$ faust -L eventlet -A myproj worker -l info
หรือเพิ่ม import mode.loop.eventlet
ที่ด้านบนของสคริปต์จุดเริ่มต้นของคุณ:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
คำเตือน
สิ่งสำคัญมากคือตำแหน่งนี้จะอยู่ที่ด้านบนสุดของโมดูล และจะต้องดำเนินการก่อนที่คุณจะนำเข้าไลบรารี
ใช่! ใช้สะพาน tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
ใช่! ใช้การใช้งานเครื่องปฏิกรณ์ asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
ไม่ Faust ต้องการ Python 3.6 หรือใหม่กว่า เนื่องจากใช้ฟีเจอร์ที่เปิดตัวใน Python 3.6 อย่างมาก (async, await, คำอธิบายประกอบประเภทตัวแปร)
คุณอาจต้องเพิ่มขีดจำกัดสำหรับจำนวนไฟล์สูงสุดที่เปิดอยู่ โพสต์ต่อไปนี้อธิบายวิธีการดังกล่าวบน OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust รองรับ kafka ด้วยเวอร์ชัน >= 0.10
สำหรับการอภิปรายเกี่ยวกับการใช้งาน การพัฒนา และอนาคตของ Faust โปรดเข้าร่วม `fauststream`_ Slack
หากคุณมีข้อเสนอแนะ รายงานข้อผิดพลาด หรือการรบกวน โปรดรายงานไปยังเครื่องมือติดตามปัญหาของเราที่ https://github.com/robinhood/faust/issues/
ซอฟต์แวร์นี้ได้รับอนุญาตภายใต้ใบอนุญาต BSD ใหม่ ดูไฟล์ LICENSE
ในไดเร็กทอรีการแจกจ่ายระดับบนสุดสำหรับข้อความลิขสิทธิ์ฉบับเต็ม
การพัฒนา Faust เกิดขึ้นที่ GitHub: https://github.com/robinhood/faust
คุณได้รับการสนับสนุนอย่างมากให้มีส่วนร่วมในการพัฒนาเฟาสท์
อย่าลืมอ่านส่วนการมีส่วนร่วมกับเฟาสท์ในเอกสารประกอบด้วย
ทุกคนที่โต้ตอบในฐานรหัสของโครงการ เครื่องมือติดตามปัญหา ห้องสนทนา และรายชื่อผู้รับจดหมาย จะต้องปฏิบัติตามหลักจรรยาบรรณของ Faust
ในฐานะผู้มีส่วนร่วมและผู้ดูแลโครงการเหล่านี้ และเพื่อประโยชน์ในการส่งเสริมชุมชนที่เปิดกว้างและเป็นมิตร เราให้คำมั่นที่จะเคารพทุกคนที่มีส่วนร่วมผ่านการรายงานปัญหา การโพสต์คำขอคุณสมบัติ การอัปเดตเอกสาร การส่งคำขอดึงข้อมูลหรือแพตช์ และกิจกรรมอื่น ๆ
เรามุ่งมั่นที่จะทำให้การมีส่วนร่วมในโครงการเหล่านี้เป็นประสบการณ์ที่ปราศจากการคุกคามสำหรับทุกคน โดยไม่คำนึงถึงระดับของประสบการณ์ เพศ อัตลักษณ์และการแสดงออกทางเพศ รสนิยมทางเพศ ความพิการ รูปลักษณ์ส่วนตัว ขนาดร่างกาย เชื้อชาติ ชาติพันธุ์ อายุ ศาสนา หรือ สัญชาติ.
ตัวอย่างพฤติกรรมที่ผู้เข้าร่วมไม่สามารถยอมรับได้ ได้แก่:
ผู้ดูแลโครงการมีสิทธิ์และความรับผิดชอบในการลบ แก้ไข หรือปฏิเสธความคิดเห็น การคอมมิต รหัส การแก้ไขวิกิ ปัญหา และการสนับสนุนอื่น ๆ ที่ไม่สอดคล้องกับหลักจรรยาบรรณนี้ การนำหลักจรรยาบรรณนี้มาใช้ ผู้ดูแลโครงการมุ่งมั่นที่จะนำหลักการเหล่านี้ไปใช้กับทุกด้านของการจัดการโครงการนี้อย่างยุติธรรมและสม่ำเสมอ ผู้ดูแลโครงการที่ไม่ปฏิบัติตามหรือบังคับใช้หลักจรรยาบรรณอาจถูกถอดออกจากทีมงานโครงการอย่างถาวร
หลักจรรยาบรรณนี้ใช้ทั้งภายในพื้นที่โครงการและในพื้นที่สาธารณะเมื่อบุคคลเป็นตัวแทนของโครงการหรือชุมชน
กรณีของพฤติกรรมที่ไม่เหมาะสม คุกคาม หรือยอมรับไม่ได้อาจถูกรายงานโดยการเปิดปัญหาหรือติดต่อผู้ดูแลโครงการหนึ่งคนขึ้นไป
หลักจรรยาบรรณนี้ดัดแปลงมาจาก Contributor Covenant เวอร์ชัน 1.2.0 ดูได้ที่ http://contributor-covenant.org/version/1/2/0/