streamable
Manipulasi aliran seperti iterable
Stream[T]
menghiasi Iterable[T]
dengan antarmuka yang lancar memungkinkan rantai operasi malas.
? Fasih | Metode rantai! |
? Diketik | Tipe-dianotasi dan mypy mampu |
? Malas | Operasi dievaluasi dengan malas pada waktu iterasi |
Bersamaan | melalui utas atau proses atau asyncio |
? ️ kuat | Unit-Test untuk Python 3.7 hingga 3.14 dengan cakupan 100% |
? Minimalis | pip install streamable Tanpa Ketergantungan Tambahan |
pip install streamable
from streamable import Stream
Instantiate Stream[T]
dari Iterable[T]
.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
S tidak dapat diubah : Menerapkan operasi mengembalikan aliran baru.
Operasi malas : hanya dievaluasi pada waktu iterasi.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
seperti yang Anda lakukan pada apa pun Iterable[T]
. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
Menerapkan transformasi pada elemen:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
Menerapkan transformasi melalui utas
concurrency
:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
Mempertahankan pesanan hulu secara default (FIFO), tetapi Anda dapat mengatur
ordered=False
untuk pertama kali dilakukan terlebih dahulu .
concurrency
juga merupakan ukuran buffer yang mengandung hasil yang belum dihasilkan. Jika buffer penuh, iterasi di atas hulu dijeda sampai hasilnya dihasilkan dari buffer.
Setel
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
Operasi saudara kandung
.amap
menerapkan fungsi async:
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
Dekorator fungsi
star
mengubah fungsi yang membawa beberapa argumen posisi menjadi fungsi yang mengambil tuple:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
Juga nyaman dengan
.foreach
,.filter
, ...
.foreach
Menerapkan efek samping pada elemen:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
Seperti
.map
ia memiliki parameterconcurrency
opsional.
Suka untuk
.map
, atur parametervia="process"
.
Seperti
.map
itu memiliki saudara kandung.aforeach
operasi untuk async.
.filter
Hanya menyimpan elemen yang memenuhi kondisi:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
Membatasi jumlah hasil
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
dan/atau memastikan
interval
waktu minimum memisahkan hasil yang berurutan:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
Grup elemen ke dalam
List
s:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
Parameter
size
campuran /by
/interval
:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
Unsur -unsur unelir dengan asumsi bahwa mereka
Iterable
.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
Flattens
concurrency
iterable secara bersamaan:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
Menangkap jenis pengecualian tertentu, dan secara opsional menghasilkan nilai
replacement
:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
Anda dapat menentukan tambahan
when
kondisi untuk tangkapan:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
Ini memiliki parameter
finally_raise: bool
untuk menaikkan pengecualian yang ditangkap pertama saat iterasi berakhir.
.truncate
Akhir iterasi Setelah sejumlah elemen telah dihasilkan:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... atau ketika suatu kondisi telah menjadi puas:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
Mencatat kemajuan iterasi selama aliran ini, jika Anda mengulangi:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
Anda akan mendapatkan log ini:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
Jumlah log tidak akan pernah berlebihan karena mereka diproduksi secara logaritmik misalnya log ke -11 akan diproduksi ketika iterasi mencapai elemen ke -1024.
Peringatan
Ini bisu antara v1.1.0 dan v1.3.1 , silakan pip install --upgrade streamable
zip
Gunakan fungsi
zip
standar:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
Tolong bantu kami! Merasa sangat diterima di:
.count
Berseram di atas aliran sampai kelelahan dan mengembalikan jumlah elemen yang dihasilkan.
>> > assert integers . count () == 10
Memanggil aliran berulang kali sampai kelelahan dan mengembalikannya.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
Skrip ETL (yaitu skrip mengambil -> pemrosesan -> mendorong data) dapat memperoleh manfaat dari ekspresifitas perpustakaan ini.
Berikut adalah contoh yang dapat Anda copy-paste dan coba (hanya membutuhkan requests
): Ini membuat file CSV yang berisi semua 67 berkaki empat dari generasi ke-1, ke-2 dan ke-3 dari Pokémons (Kudos to Pokéapi)
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
Kelas Stream
memaparkan metode .accept
dan Anda dapat mengimplementasikan pengunjung dengan memperluas streamable.visitors.Visitor
kelas abstrak:
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Metode Stream
juga diekspos sebagai fungsi:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
Manfaat dari Build Python 3.13+ yang dilapisi bebas, dijalankan melalui python -X gil=0
.