streamable
反復可能性の河川のような操作
Stream[T]
怠zyな操作のチェーンを可能にする流fluentインターフェイスでIterable[T]
を飾ります。
?流暢 | チェーンメソッド! |
?タイプしました | タイプは解決し、 mypy |
?怠け者 | 操作は、反復時間にゆっくりと評価されます |
同時 | スレッドまたはプロセス、またはasyncio 介して |
?§ロバスト | Python 3.7から3.14のユニットテストで100%のカバレッジ |
?ミニマリスト | pip install streamable 追加の依存関係なしでできます |
pip install streamable
from streamable import Stream
Iterable[T]
からStream[T]
をインスタンス化します。
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
Sは不変です。操作を適用すると、新しいストリームが返されます。
操作は怠zyです:反復時間でのみ評価されます。
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Iterable[T]
を介してStream[T]
。 >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
要素に変換を適用します。
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
concurrency
スレッドを介して変換を適用します。
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
デフォルトで上流の注文を保存します(FIFO)が、最初に完了した場合は
ordered=False
を設定できます。
concurrency
、まだ計算されていない結果を含むバッファーのサイズでもあります。バッファがいっぱいの場合、バッファから結果が得られるまで上流の反復が一時停止されます。
via="process"
を設定します:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
兄弟操作
.amap
は非同期関数を適用します。
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
star
関数デコレーターは、いくつかの位置的引数を採用する関数を、タプルを取る関数に変換します。
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
また、
.foreach
、.filter
、...
.foreach
要素に副作用を適用します。
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
.map
のように、オプションのconcurrency
パラメーターがあります。
.map
のように、= "Processvia="process"
を設定します。
.map
のように、兄弟のための兄弟.aforeach
操作があります。
.filter
条件を満たす要素のみを保持します。
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
収量の数を制限
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
および/または最小時間
interval
連続した収量を分離することを確認してください。
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
List
sへの要素をグループ化:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
size
/by
/interval
パラメーターを混合します:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
それらが
Iterable
であると仮定した非グループの要素。
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
同時に同時に
concurrency
になります:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
特定のタイプの例外をキャッチし、オプションで
replacement
値を生成します。
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
キャッチの条件を追加する
when
ができます。
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
オプションの
finally_raise: bool
パラメーターがあり、イテレーションが終了したときに最初にキャッチされた例外を上げます。
.truncate
特定の数の要素が得られたら、反復を終了します。
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
...または条件が満たされたとき:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
繰り返す場合、このストリーム上の反復の進行を記録します。
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
これらのログを取得します:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
ログの量は、反復が1024番目の要素に達すると対数的に生成されるため、圧倒的ではありません。
警告
v1.1.0とv1.3.1の間でpip install --upgrade streamable
います。
zip
標準の
zip
関数を使用します:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
助けてください!とても歓迎してください:
.count
疲労するまでストリームを反復し、生成された要素のカウントを返します。
>> > assert integers . count () == 10
ストリームを呼び出すと、疲労するまで繰り返して返します。
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
ETLスクリプト(つまり、スクリプトの取得 - >処理 - >データのプッシュ)は、このライブラリの表現力から利益を得ることができます。
コピーパステと試してみることができる例は次のとおりです( requests
のみが必要です):1世代、第2世代、第3世代のポケモン(ポケアピまで)のすべての67の四足動物を含むCSVファイルを作成します。
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
Stream
Classは.accept
メソッドを公開し、 streamable.visitors.Visitor
Abstractクラスを拡張することで訪問者を実装できます。
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Stream
のメソッドも関数として公開されます。
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
python -X gil=0
を介して実行されるフリースレッドPython 3.13+ビルドのメリット。