streamable
تلاعب يشبه التيار من التكرار
يقوم Stream[T]
بتزيين Iterable[T]
مع واجهة بطلاقة تمكن من التسلسل للعمليات البطيئة.
؟ طلِق | طرق السلسلة! |
؟ مطبوع | النوع المصمم و mypy قادر |
؟ كسول | يتم تقييم العمليات بتكاسل في وقت التكرار |
متزامنة | عبر المواضيع أو العمليات أو asyncio |
؟ ️ قوي | تم اختبار الوحدة لبيثون 3.7 إلى 3.14 بتغطية 100 ٪ |
؟ الحد الأدنى | pip install streamable مع عدم وجود تبعيات إضافية |
pip install streamable
from streamable import Stream
إنشاء Stream[T]
من Iterable[T]
.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
S غير قابل للتغيير : تطبيق عملية إرجاع دفق جديد.
العمليات كسول : تم تقييمها فقط في وقت التكرار.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
كما تفعل على أي شيء آخر Iterable[T]
. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
يطبق تحولًا على العناصر:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
يطبق التحول عبر مؤشرات
concurrency
:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
يحافظ على ترتيب المنبع افتراضيًا (FIFO) ، ولكن يمكنك تعيين
ordered=False
for first Out Out .
concurrency
هو أيضًا حجم المخزن المؤقت الذي يحتوي على نتائج غير محمية. إذا كان المخزن المؤقت ممتلئًا ، يتم إيقاف التكرار على المنبع حتى يتم تحقيق النتيجة من المخزن المؤقت.
تعيين
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
عملية الأخوة
.amap
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
يحول دائرة دالة
star
وظيفة تأخذ العديد من الوسائط الموضعية إلى وظيفة تتطلب تناقصًا:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
مريحة أيضا مع
.foreach
.filter
.
.foreach
يطبق تأثيرًا جانبيًا على العناصر:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
مثل
.map
لديها معلمةconcurrency
اختيارية.
مثل
.map
، اضبط المعلمةvia="process"
.
مثل
.map
لديها عملية.aforeach
.
.filter
يحتفظ فقط بالعناصر التي ترضي الشرط:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
يحد من عدد العائدات
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
و/أو ضمان الحد الأدنى من
interval
الزمني يفصل العائدات المتتالية:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
مجموعات العناصر في
List
S:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
مزيج
size
/by
/ معلماتinterval
:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
عناصر غير مجموعات على افتراض أنها
Iterable
.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
تكرار
concurrency
بشكل متزامن:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
يمسك نوعًا معينًا من الاستثناءات ، ويحسب اختياريًا قيمة
replacement
:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
يمكنك تحديد شرط إضافي
when
الصيد:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
يحتوي على معلمة Bool اختيارية
finally_raise: bool
لرفع الاستثناء الأول الذي تم صيده عند انتهاء التكرار.
.truncate
ينتهي التكرار بمجرد أن يتم إنتاج عدد معين من العناصر:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... أو عندما تصبح حالة راضية:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
يسجل تقدم التكرارات على هذا الدفق ، إذا كنت تكرر:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
سوف تحصل على هذه السجلات:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
لن تكون كمية السجلات ساحقة أبدًا لأنها يتم إنتاجها لوغاريتمي ، على سبيل المثال ، سيتم إنتاج السجل الحادي عشر عندما يصل التكرار إلى العنصر 1024.
تحذير
إنه كتم بين v1.1.0 و v1.3.1 ، يرجى pip install --upgrade streamable
zip
استخدم وظيفة
zip
القياسية:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
الرجاء مساعدتنا! أشعر مرحبًا بكم في:
.count
>> > assert integers . count () == 10
يكرر استدعاء الدفق فوقه حتى الإرهاق ويعيده.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
يمكن أن تستفيد البرامج النصية ETL (أي البرامج النصية التي تجلبها -> المعالجة -> دفع البيانات) من تعبير هذه المكتبة.
فيما يلي مثال على أنه يمكنك نسخ الزجاج والمحاولة (يتطلب فقط requests
): إنه ينشئ ملف CSV يحتوي على جميع الأجيال الـ 67 من الأجيال الأولى والثانية والثالثة من Pokémons (Kudos to Pokéapi)
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
تعرض فئة Stream
طريقة .accept
ويمكنك تنفيذ زائر عن طريق توسيع نطاق الفئة streamable.visitors.Visitor
.
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
تتعرض طرق Stream
أيضًا كوظائف:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
تستفيد من Bython 3.13+ الحرة ، وتشغيلها عبر python -X gil=0
.