streamable
Stream-ähnliche Manipulation von iTerables
Ein Stream[T]
dekoriert eine Iterable[T]
mit einer fließenden Schnittstelle, die die Verkettung fauler Operationen ermöglicht.
? Fließend | Kettenmethoden! |
? Tippt | Typ-Annotierter und mypy fähig |
? Faul | Operationen werden bei Iterationszeit faul ausgewertet |
Gleichzeitig | über Threads oder Prozesse oder asyncio |
- robust | Einheit getestet für Python 3,7 bis 3,14 mit 100% Abdeckung |
? Minimalistisch | pip install streamable ohne zusätzliche Abhängigkeiten |
pip install streamable
from streamable import Stream
Instanziieren Sie einen Stream[T]
von einem Iterable[T]
.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
S sind unveränderlich : Die Anwendung eines Vorgangs gibt einen neuen Stream zurück.
Die Operationen sind faul : Nur zur Iterationszeit bewertet.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
wie Sie es über alle anderen Iterable[T]
tun würden. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
Wendet eine Transformation auf Elemente an:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
Wendet die Transformation über
concurrency
an:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
Bewahrt die stromaufwärts gelegene Reihenfolge standardmäßig (FIFO), aber Sie können
ordered=False
für das erste Mal festlegen.
concurrency
ist auch die Größe des Puffers, der noch nicht erzählte Ergebnisse enthält. Wenn der Puffer voll ist, wird die Iteration über dem Strombetrieb durchgeführt, bis ein Ergebnis aus dem Puffer ergibt.
Setzen Sie
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
Die Geschwisteroperation
.amap
wendet eine asynchronisierte Funktion an:
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
Der
star
-Funktionsdekorator verwandelt eine Funktion, die mehrere Positionsargumente in eine Funktion verwandelt, die ein Tupel erfordert:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
Auch bequem mit
.foreach
,.filter
, ...
.foreach
Wirkt eine Nebenwirkung auf Elemente an:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
Wie
.map
hat es einen optionalen Parameterconcurrency
.
Legen Sie den Parameter
via="process"
.map
Wie
.map
hat es.aforeach
Geschwister.
.filter
Behält nur die Elemente, die einen Zustand erfüllen:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
Begrenzt die Anzahl der Ausbeuten
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
und/oder sicherstellen, dass ein
interval
aufeinanderfolgende Erträge trennt:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
Gruppiert Elemente in
List
S:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
Mix
size
/by
/interval
:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
Ungruppenelemente unter der Annahme, dass sie
Iterable
s sind.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
Flachen
concurrency
iterbare Säle gleichzeitig:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
Fängt eine bestimmte Art von Ausnahmen ein und ergibt optional einen
replacement
:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
Sie können eine zusätzliche
when
Bedingung für den Fang angeben:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
Es hat eine optionale
finally_raise: bool
Ausnahme, wenn die Iteration endet.
.truncate
Beendet Iteration, sobald eine bestimmte Anzahl von Elementen ergeben wurde:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... oder wenn ein Zustand zufrieden geworden ist:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
Loggen Sie den Fortschritt der Iterationen über diesen Stream an, wenn Sie iterieren:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
Sie erhalten diese Protokolle:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
Die Anzahl der Protokolle wird niemals überwältigend sein, da sie logarithmisch erzeugt werden, z. B. wird das 11. Protokoll erzeugt, wenn die Iteration das 1024. Element erreicht.
Warnung
Es ist stumm zwischen v1.1.0 und v1.3.1 , bitte pip install --upgrade streamable
zip
Verwenden Sie die Standard
zip
-Funktion:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
Bitte helfen Sie uns! Fühlen Sie sich sehr willkommen zu:
.count
>> > assert integers . count () == 10
Das Aufrufen des Streams iteriert ihn bis zur Erschöpfung und gibt ihn zurück.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
ETL -Skripte (IE -Skripte abrufen -> Verarbeitung -> Daten pushen) können von der Expressivität dieser Bibliothek profitieren.
Hier ist ein Beispiel, das Sie kopieren und versuchen können (es erfordert nur requests
): Es erstellt eine CSV-Datei, die alle 67 Quadrupeds aus dem 1., 2. und 3. Generationen von Pokémons (Lob an Pokéapi) enthält (ein großes Lob an Pokéapi).
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
Die Stream
-Klasse enthüllt eine .accept
Akzeptanzmethode und Sie können einen Besucher durch Erweiterung der streamable.visitors.Visitor
STRACTRACT CLASSE:
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Die Methoden des Stream
sind auch als Funktionen aufgedeckt:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
Vorteile von freien Python 3.13+ Builds, laufen Sie über python -X gil=0
.