streamable
Manipulation de flux d'itérables
Un Stream[T]
décore un Iterable[T]
avec une interface fluide permettant le chaînage d'opérations paresseuses.
? Courant | Méthodes de chaîne! |
? Dactylographié | Annulé de type et mypy capable |
? Paresseux | Les opérations sont évaluées paresseusement au moment de l'itération |
Concurrent | via des threads ou des processus ou asyncio |
? ️ robuste | testés par unité pour Python 3,7 à 3,14 avec une couverture à 100% |
? Minimaliste | pip install streamable sans dépendances supplémentaires |
pip install streamable
from streamable import Stream
Instancier un Stream[T]
d'un Iterable[T]
.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
sont immuables : l'application d'une opération renvoie un nouveau flux.
Les opérations sont paresseuses : évaluées uniquement au moment de l'itération.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
comme vous le feriez sur toute autre Iterable[T]
. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
Applique une transformation sur les éléments:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
Applique la transformation via des threads
concurrency
:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
Préserve l'ordre en amont par défaut (FIFO), mais vous pouvez définir
ordered=False
pour d'abord d'abord .
concurrency
est également de la taille du tampon contenant des résultats non encore renforcés. Si le tampon est plein, l'itération sur le amont est interrompue jusqu'à ce qu'un résultat soit donné du tampon.
Set
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
L'opération de frères et sœurs
.amap
applique une fonction asynchrone:
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
Le décorateur de fonction
star
transforme une fonction qui prend plusieurs arguments de position dans une fonction qui prend un tuple:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
Aussi pratique avec
.foreach
,.filter
, ...
.foreach
Applique un effet secondaire sur les éléments:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
Comme
.map
il a un paramètreconcurrency
facultatif.
Comme pour
.map
, définissez le paramètrevia="process"
.
Comme
.map
il a une opération de frères et sœurs.aforeach
pour asynchrones.
.filter
Ne garde que les éléments qui satisfont une condition:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
Limite le nombre de rendements
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
et / ou s'assurer qu'un
interval
de temps minimum sépare les rendements successifs:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
Groupe les éléments dans
List
S:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
Mélangez
size
/by
/interval
des paramètres:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
Les éléments de dépassement en supposant qu'ils sont
Iterable
.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
Aplaties simultanément
concurrency
: simultanément:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
Attrape un type d'exceptions donné et donne éventuellement une valeur
replacement
:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
Vous pouvez spécifier une condition supplémentaire en
when
de capture:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
Il a un paramètre
finally_raise: bool
facultatif pour augmenter la première exception captée à la fin de l'itération.
.truncate
Fin une itération une fois qu'un nombre donné d'éléments ont été cédés:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... ou quand une condition est devenue satisfaite:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
Journaux La progression des itérations sur ce flux, si vous itérez sur:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
Vous obtiendrez ces journaux:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
La quantité de journaux ne sera jamais écrasante car elles sont produites logarithmiquement, par exemple, le 11e journal sera produit lorsque l'itération atteindra le 1024e élément.
Avertissement
Il est muet entre V1.1.0 et V1.3.1 , veuillez pip install --upgrade streamable
zip
Utilisez la fonction
zip
standard:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
Aidez-nous! Soyez le bienvenu à:
.count
itère sur le flux jusqu'à l'épuisement et renvoie le nombre d'éléments cédés.
>> > assert integers . count () == 10
Appeler le flux itère dessus jusqu'à l'épuisement et le retourne.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
Les scripts ETL (c.-à-d. Les scripts récupérant -> Traitement -> Pushing Data) peuvent bénéficier de l'expressivité de cette bibliothèque.
Voici un exemple que vous pouvez copier-coller et essayer (cela ne nécessite que requests
): il crée un fichier CSV contenant tous les 67 quadrupèdes des 1er, 2e et 3e générations de Pokémons (bravo à Pokéapi)
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
La classe Stream
expose une méthode .accept
et vous pouvez implémenter un visiteur en étendant la classe Résumé streamable.visitors.Visitor
:
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Les méthodes du Stream
sont également exposées en fonction:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
Bénéficies Python 3.13+ Freehread Python, exécutez-le via python -X gil=0
.