streamable
การยักย้ายถ่ายเทเหมือนลำธารของ iterables
Stream[T]
ตกแต่ง Iterable[T]
ด้วย อินเทอร์เฟซที่คล่องแคล่ว ทำให้สามารถใช้งานได้ของการดำเนินงานที่ขี้เกียจ
- คล่องแคล่ว | วิธีลูกโซ่! |
- ที่พิมพ์ออกมา | พิมพ์คำย่อ และ mypy สามารถ |
- ขี้เกียจ | การดำเนินงานได้ รับการประเมินอย่างเกียจคร้าน ในเวลาวนซ้ำ |
พร้อมกัน | ผ่าน เธรด หรือ กระบวนการ หรือ asyncio |
? ️ แข็งแกร่ง | การทดสอบหน่วยสำหรับ Python 3.7 ถึง 3.14 พร้อมความคุ้มครอง 100% |
- เรียบง่าย | pip install streamable โดย ไม่มีการอ้างอิงเพิ่มเติม |
pip install streamable
from streamable import Stream
อินสแตน Stream[T]
จาก Iterable[T]
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
S ไม่เปลี่ยนรูป : การใช้การดำเนินการส่งคืนสตรีมใหม่
การดำเนินงาน ขี้เกียจ : ประเมินเฉพาะเวลาซ้ำเท่านั้น
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
ตามที่คุณ Iterable[T]
อื่น ๆ >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
ใช้การเปลี่ยนแปลงองค์ประกอบ:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
ใช้การแปลงผ่านเธรด
concurrency
:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
รักษาลำดับต้นน้ำตามค่าเริ่มต้น (FIFO) แต่คุณสามารถตั้ง
ordered=False
สำหรับ การทำครั้งแรกก่อน
concurrency
กันเป็นขนาดของบัฟเฟอร์ที่มีผลลัพธ์ที่ไม่ได้ผล หากบัฟเฟอร์เต็มการวนซ้ำของต้นน้ำจะถูกหยุดชั่วคราว จนกว่าจะได้ผลลัพธ์จากบัฟเฟอร์
ตั้งค่า
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
การดำเนินการของพี่น้อง
.amap
ใช้ฟังก์ชั่น async:
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
ฟังก์ชั่นส
star
Decorator แปลงฟังก์ชั่นที่ใช้อาร์กิวเมนต์ตำแหน่งหลายตำแหน่งเป็นฟังก์ชั่นที่ใช้ tuple:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
สะดวกด้วย
.foreach
.filter
, ...
.foreach
ใช้ผลข้างเคียงกับองค์ประกอบ:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
เช่น
.map
มันมีพารามิเตอร์concurrency
กันที่เป็นตัวเลือก
เช่นเดียวกับ
.map
ตั้งค่าพารามิเตอร์via="process"
เช่น
.map
มันมีการดำเนินการ.aforeach
พี่น้องสำหรับ async
.filter
รักษาองค์ประกอบที่เป็นไปตามเงื่อนไขเท่านั้น:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
จำกัด จำนวนผลผลิต
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
และ/หรือตรวจสอบ
interval
เวลาขั้นต่ำแยกผลผลิตต่อเนื่อง:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
องค์ประกอบกลุ่มลงใน
List
S:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
size
ผสม /by
/ พารามิเตอร์interval
:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
องค์ประกอบที่ไม่ได้จัดกลุ่มสมมติว่าพวกเขา
Iterable
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
flattens
concurrency
กัน iterables พร้อมกัน:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
จับข้อยกเว้นประเภทที่กำหนดและเลือกค่า
replacement
: เป็นทางเลือก:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
คุณสามารถระบุเพิ่มเติม
when
เงื่อนไขสำหรับการจับ:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
มันมีพารามิเตอร์
finally_raise: bool
ที่เป็นตัวเลือกในการเพิ่มข้อยกเว้นครั้งแรกเมื่อการวนซ้ำสิ้นสุดลง
.truncate
สิ้นสุดการทำซ้ำเมื่อมีการให้องค์ประกอบจำนวนมาก:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... หรือเมื่อเงื่อนไขได้รับความพึงพอใจ:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
บันทึกความคืบหน้าของการวนซ้ำผ่านสตรีมนี้หากคุณวนซ้ำ:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
คุณจะได้รับบันทึกเหล่านี้:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
จำนวนของบันทึกจะไม่เกิดขึ้นเพราะพวกเขาผลิตลอการิทึมเช่นบันทึกที่ 11 จะเกิดขึ้นเมื่อการทำซ้ำมาถึงองค์ประกอบ 1024th
คำเตือน
มันปิดเสียงระหว่าง v1.1.0 และ v1.3.1 โปรด pip install --upgrade streamable
zip
ใช้ฟังก์ชั่น
zip
มาตรฐาน:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
โปรดช่วยเราด้วย! รู้สึกยินดีเป็นอย่างยิ่ง:
.count
วนซ้ำผ่านกระแสจนอ่อนตัวและส่งคืนการนับองค์ประกอบที่ให้ผล
>> > assert integers . count () == 10
การเรียก กระแสวนซ้ำไปจนหมดและส่งคืน
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
สคริปต์ ETL (เช่นสคริปต์การดึงข้อมูล -> การประมวลผล -> การผลักดันข้อมูล) สามารถได้รับประโยชน์จากการแสดงออกของไลบรารีนี้
นี่คือตัวอย่างที่คุณสามารถ คัดลอกวางและลอง (ต้องการเฉพาะ requests
เท่านั้น): มันสร้างไฟล์ CSV ที่มี 67 quadrupeds ทั้งหมดจากPokémonsรุ่นที่ 1, 2 และ 3
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
คลาส Stream
เปิดเผยวิธี .accept
และคุณสามารถใช้ ผู้เข้าชม ได้โดยขยาย streamable.visitors.Visitor
คลาสนามธรรม:
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
วิธีการของ Stream
นั้นถูกเปิดเผยเป็นฟังก์ชั่น:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
ประโยชน์จากการสร้าง Free -Threaded Python 3.13+, รันผ่าน python -X gil=0