streamable
Потокоподобные манипуляции с итерами
Stream[T]
украшает Iterable[T]
с свободным интерфейсом, позволяющим цепочке ленивых операций.
? Беглый | Методы цепи! |
? Напечатано | Анонсированная типа и mypy способна |
? Ленивый | Операции лениво оцениваются во время итерации |
Одновременный | через потоки , процессы или asyncio |
? ️ надежный | Протестировано в единице на Python от 3,7 до 3,14 со 100% покрытием |
? Минималистский | pip install streamable без дополнительных зависимостей |
pip install streamable
from streamable import Stream
Создать Stream[T]
из Iterable[T]
.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
S не подходит : применение операции возвращает новый поток.
Операции ленивы : оценивается только во время итерации.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Stream[T]
, как и на любом другом Iterable[T]
. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
Применяет трансформацию на элементы:
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
Применяет преобразование через потоки
concurrency
:
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
Сохраняет вверх по течению порядок по умолчанию (FIFO), но вы можете установить
ordered=False
для сначала сначала .
concurrency
также является размером буфера, содержащего еще не добываемых результатов. Если буфер заполнен, итерация над вверх по течению приостанавливается до тех пор, пока из буфера не будет получен результат.
Установить
via="process"
:
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
Операция брата
.amap
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
Декоратор функции
star
превращает функцию, которая принимает несколько позиционных аргументов в функцию, которая принимает кортеж:
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
Также удобно с
.foreach
,.filter
, ...
.foreach
Применяет побочный эффект на элементы:
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
Как
.map
у него есть необязательный параметрconcurrency
.
Как для
.map
, установите параметрvia="process"
.
Как
.map
у него есть операция с братом.aforeach
для Async.
.filter
Сохраняет только элементы, которые удовлетворяют условию:
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
Ограничивает количество доходности
per_second
/per_minute
/per_hour
:
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
и/или убедитесь, что минимальный
interval
времени отделяет последовательные урожаи:
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
Группы элементы в
List
S:
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
Миксель
size
/by
/interval
параметры:
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
Элементы Ungroups, предполагающие, что они
Iterable
с.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
Одновременно сглаживает
concurrency
итерации:
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
Поймает заданный тип исключений и при желании дает значение
replacement
:
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
Вы можете указать дополнительное,
when
условие для улова:
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
У него есть необязательный параметр
finally_raise: bool
чтобы поднять первое завоеванное исключение, когда итерация заканчивается.
.truncate
Заканчивается итерация после того, как заданное количество элементов было дано:
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... или когда условие стало удовлетворенным:
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
Зарегистрирует прогресс итераций по этому потоку, если вы выполните действие:
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
Вы получите эти журналы:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
Количество журналов никогда не будет подавляющим, потому что они производятся логарифмически, например, 11 -й журнал будет создаваться, когда итерация достигнет 1024 -го элемента.
Предупреждение
Это немой между v1.1.0 и v1.3.1 , пожалуйста, pip install --upgrade streamable
zip
Используйте стандартную функцию
zip
:
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
Пожалуйста, помогите нам! Чувствуйте себя очень добро пожаловать:
.count
итерации по потоку до исчерпания и возвращает подсчет элементов.
>> > assert integers . count () == 10
Вызов потока итерации на него до исчерпания и возвращает его.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
Сценарии ETL (то есть сценарии извлечения -> Обработка -> Нажатие данных) могут извлечь выгоду из выразительности этой библиотеки.
Вот пример, который вы можете скопировать и попробовать (это требует только requests
): он создает файл CSV, содержащий все 67 четвероногих из 1-го, 2-го и 3-го поколений Pokémons (слава To Pokéapi)
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
Класс Stream
раскрывает метод .accept
, и вы можете реализовать посетителя , расширяя streamable.visitors.Visitor
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Методы Stream
также выявляются как функции:
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
Преимущества свободных сборочных сборок Python 3.13+, запустите через python -X gil=0
.