streamable
반복의 스트림과 같은 조작
Stream[T]
유창한 인터페이스를 사용하여 Iterable[T]
장식하여 게으른 작업의 사슬을 가능하게합니다.
? 유창한 | 체인 방법! |
? 타이핑 | 유형 공개 및 mypy Able |
? 게으른 | 운영은 반복 시간에 게으르게 평가 됩니다 |
경쟁 상대 | 스레드 또는 프로세스 또는 asyncio 통해 |
? ️ 강력한 | 파이썬 3.7 ~ 3.14 의 경우 100% 적용 범위로 단위 테스트 |
? 미니멀리스트 | pip install streamable 추가 종속성없이 스트림을 설치할 수 있습니다 |
pip install streamable
from streamable import Stream
Iterable[T]
에서 Stream[T]
인스턴스화하십시오.
integers : Stream [ int ] = Stream ( range ( 10 ))
Stream
은 불변 입니다. 작업을 적용하면 새 스트림을 반환합니다.
운영은 게으른 다 : 반복 시간에만 평가된다.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError )
)
Iterable[T]
와 마찬가지로 Stream[T]
반복하십시오. >> > list ( inverses )
[ 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
>> > set ( inverses )
{ 0.5 , 1.0 , 0.2 , 0.33 , 0.25 , 0.17 , 0.14 , 0.12 , 0.11 }
>> > sum ( inverses )
2.82
>> > max ( inverses )
1.0
>> > from functools import reduce
>> > reduce (..., inverses )
>> > for inverse in inverses :
>> > ...
>> > inverses_iter = iter ( inverses )
>> > next ( inverses_iter )
1.0
>> > next ( inverses_iter )
0.5
.map
요소에 변환을 적용합니다.
negative_integer_strings : Stream [ str ] = integers . map ( lambda n : - n ). map ( str )
assert list ( negative_integer_strings ) == [ '0' , '-1' , '-2' , '-3' , '-4' , '-5' , '-6' , '-7' , '-8' , '-9' ]
concurrency
스레드를 통해 변환을 적용합니다.
import requests
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. map ( requests . get , concurrency = 3 )
. map ( requests . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
기본적으로 업스트림 순서를 보존하지만 (FIFO)는 먼저 완료된 경우
ordered=False
설정할 수 있습니다.
concurrency
또한 수확하지 않은 결과를 포함하는 버퍼의 크기입니다. 버퍼가 가득 차면, 버퍼에서 결과가 산출 될 때까지 상류의 반복이 일시 중지됩니다 .
via="process"
설정 :
if __name__ == "__main__" :
state : List [ int ] = []
n_integers : int = integers . map ( state . append , concurrency = 4 , via = "process" ). count ()
assert n_integers == 10
assert state == [] # main process's state not mutated
형제 자매 작업
.amap
은 비동기 기능을 적용합니다.
import httpx
import asyncio
http_async_client = httpx . AsyncClient ()
pokemon_names : Stream [ str ] = (
Stream ( range ( 1 , 4 ))
. map ( lambda i : f"https://pokeapi.co/api/v2/pokemon-species/ { i } " )
. amap ( http_async_client . get , concurrency = 3 )
. map ( httpx . Response . json )
. map ( lambda poke : poke [ "name" ])
)
assert list ( pokemon_names ) == [ 'bulbasaur' , 'ivysaur' , 'venusaur' ]
asyncio . get_event_loop (). run_until_complete ( http_async_client . aclose ())
star
Function Decorator는 여러 위치 인수를 튜플을 취하는 함수로 변환합니다.
from streamable import star
zeros : Stream [ int ] = (
Stream ( enumerate ( integers ))
. map ( star ( lambda index , integer : index - integer ))
)
assert list ( zeros ) == [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
.foreach
,.filter
, ...
.foreach
요소에 부작용을 적용합니다.
self_printing_integers : Stream [ int ] = integers . foreach ( print )
assert list ( self_printing_integers ) == list ( integers ) # triggers the printing
.map
과 마찬가지로 옵션concurrency
매개 변수가 있습니다.
.map
과 마찬가지로via="process"
매개 변수를 설정하십시오.
.map
과 마찬가지로 Async에 대한 형제..aforeach
작업이 있습니다.
.filter
조건을 만족시키는 요소 만 유지합니다.
pair_integers : Stream [ int ] = integers . filter ( lambda n : n % 2 == 0 )
assert list ( pair_integers ) == [ 0 , 2 , 4 , 6 , 8 ]
.throttle
per_second
/per_minute
/per_hour
의 수율을 제한합니다.
slow_integers : Stream [ int ] = integers . throttle ( per_second = 5 )
assert list ( slow_integers ) == list ( integers ) # takes 10 / 5 = 2 seconds
및/또는 최소 시간
interval
연속적인 수율을 분리하는지 확인하십시오.
from datetime import timedelta
slow_integers = integers . throttle ( interval = timedelta ( milliseconds = 100 ))
assert list ( slow_integers ) == list ( integers ) # takes 10 * 0.1 = 1 second
.group
List
에 요소를 그룹화합니다.
integers_5_by_5 : Stream [ List [ int ]] = integers . group ( size = 5 )
assert list ( integers_5_by_5 ) == [[ 0 , 1 , 2 , 3 , 4 ], [ 5 , 6 , 7 , 8 , 9 ]]
integers_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 )
assert list ( integers_by_parity ) == [[ 0 , 2 , 4 , 6 , 8 ], [ 1 , 3 , 5 , 7 , 9 ]]
from datetime import timedelta
integers_within_1s : Stream [ List [ int ]] = (
integers
. throttle ( per_second = 2 )
. group ( interval = timedelta ( seconds = 0.99 ))
)
assert list ( integers_within_1s ) == [[ 0 , 1 , 2 ], [ 3 , 4 ], [ 5 , 6 ], [ 7 , 8 ], [ 9 ]]
믹스
size
/by
/interval
매개 변수 :
integers_2_by_2_by_parity : Stream [ List [ int ]] = integers . group ( by = lambda n : n % 2 , size = 2 )
assert list ( integers_2_by_2_by_parity ) == [[ 0 , 2 ], [ 1 , 3 ], [ 4 , 6 ], [ 5 , 7 ], [ 8 ], [ 9 ]]
.flatten
Iterable
있다고 가정하는 요소를 그룹화하지 않습니다.
pair_then_odd_integers : Stream [ int ] = integers_by_parity . flatten ()
assert list ( pair_then_odd_integers ) == [ 0 , 2 , 4 , 6 , 8 , 1 , 3 , 5 , 7 , 9 ]
동시에
concurrency
반복성을 동시에 :
mix_of_0s_and_1s : Stream [ int ] = Stream ([[ 0 ] * 4 , [ 1 ] * 4 ]). flatten ( concurrency = 2 )
assert list ( mix_of_0s_and_1s ) == [ 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 ]
.catch
주어진 유형의 예외를 포착하고 선택적으로
replacement
값을 산출합니다.
inverses : Stream [ float ] = (
integers
. map ( lambda n : round ( 1 / n , 2 ))
. catch ( ZeroDivisionError , replacement = float ( "inf" ))
)
assert list ( inverses ) == [ float ( "inf" ), 1.0 , 0.5 , 0.33 , 0.25 , 0.2 , 0.17 , 0.14 , 0.12 , 0.11 ]
캐치의 추가
when
을 지정할 수 있습니다.
import requests
from requests . exceptions import SSLError
status_codes_ignoring_resolution_errors : Stream [ int ] = (
Stream ([ "https://github.com" , "https://foo.bar" , "https://github.com/foo/bar" ])
. map ( requests . get , concurrency = 2 )
. catch ( SSLError , when = lambda exception : "Max retries exceeded with url" in str ( exception ))
. map ( lambda response : response . status_code )
)
assert list ( status_codes_ignoring_resolution_errors ) == [ 200 , 404 ]
반복이 끝날 때 첫 번째 포획 예외를 올리기 위해 옵션 옵션
finally_raise: bool
매개 변수가 있습니다.
.truncate
끝 반복 주어진 수의 요소가 양보 된 후에 :
five_first_integers : Stream [ int ] = integers . truncate ( 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
... 또는 상태가 만족되면 :
five_first_integers : Stream [ int ] = integers . truncate ( when = lambda n : n == 5 )
assert list ( five_first_integers ) == [ 0 , 1 , 2 , 3 , 4 ]
.observe
반복하는 경우이 스트림에서 반복의 진행 상황을 기록합니다.
observed_slow_integers : Stream [ int ] = slow_integers . observe ( "integers" )
이 로그를 얻을 수 있습니다.
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
로그의 양은 반복이 1024 번째 요소에 도달하면 11 번째 로그가 생성되기 때문에 압도적이지 않을 것입니다.
경고
v1.1.0 과 v1.3.1 사이의 음소거입니다. pip install --upgrade streamable
zip
표준
zip
기능 사용 :
from streamable import star
cubes : Stream [ int ] = (
Stream ( zip ( integers , integers , integers )) # Stream[Tuple[int, int, int]]
. map ( star ( lambda a , b , c : a * b * c ))
)
assert list ( cubes ) == [ 0 , 1 , 8 , 27 , 64 , 125 , 216 , 343 , 512 , 729 ]
제발 도와주세요! 매우 환영합니다.
.count
소진이 발생할 때까지 스트림을 반복하고 요소의 수를 반환합니다.
>> > assert integers . count () == 10
스트림을 호출하는 것은 소진 될 때까지 반복하여 반복합니다.
>> > verbose_integers : Stream [ int ] = integers . foreach ( print )
>> > assert verbose_integers () is verbose_integers
0
1
2
3
4
5
6
7
8
9
ETL 스크립트 (예 : 스크립트 페치 -> 처리 -> 푸시 데이터)는이 라이브러리의 표현력으로부터 이익을 얻을 수 있습니다.
다음은 다음은 복사하여 페이스트하고 시도 할 수있는 예입니다 ( requests
만 필요합니다) : 1 차, 2 차 및 3 세대 Pokémons (Kudos to Pokéapi)의 67 개의 4 배를 모두 포함하는 CSV 파일을 만듭니다.
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open ( "./quadruped_pokemons.csv" , mode = "w" ) as file :
fields = [ "id" , "name" , "is_legendary" , "base_happiness" , "capture_rate" ]
writer = csv . DictWriter ( file , fields , extrasaction = 'ignore' )
writer . writeheader ()
(
# Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
Stream ( itertools . count ( 1 ))
# Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
. throttle ( per_second = 16 )
# GETs pokemons concurrently using a pool of 8 threads
. map ( lambda poke_id : f"https://pokeapi.co/api/v2/pokemon-species/ { poke_id } " )
. map ( requests . get , concurrency = 8 )
. foreach ( requests . Response . raise_for_status )
. map ( requests . Response . json )
# Stops the iteration when reaching the 1st pokemon of the 4th generation
. truncate ( when = lambda poke : poke [ "generation" ][ "name" ] == "generation-iv" )
. observe ( "pokemons" )
# Keeps only quadruped Pokemons
. filter ( lambda poke : poke [ "shape" ][ "name" ] == "quadruped" )
. observe ( "quadruped pokemons" )
# Catches errors due to None "generation" or "shape"
. catch (
TypeError ,
when = lambda error : str ( error ) == "'NoneType' object is not subscriptable"
)
# Writes a batch of pokemons every 5 seconds to the CSV file
. group ( interval = timedelta ( seconds = 5 ))
. foreach ( writer . writerows )
. flatten ()
. observe ( "written pokemons" )
# Catches exceptions and raises the 1st one at the end of the iteration
. catch ( finally_raise = True )
# Actually triggers an iteration (the lines above define lazy operations)
. count ()
)
logging . getLogger ( "streamable" ). setLevel ( logging . WARNING ) # default is INFO
Stream
클래스는 .accept
메소드를 노출시키고 streamable.visitors.Visitor
Abstract 클래스를 확장하여 방문자를 구현할 수 있습니다.
from streamable . visitors import Visitor
class DepthVisitor ( Visitor [ int ]):
def visit_stream ( self , stream : Stream ) -> int :
if not stream . upstream :
return 1
return 1 + stream . upstream . accept ( self )
def depth ( stream : Stream ) -> int :
return stream . accept ( DepthVisitor ())
assert depth ( Stream ( range ( 10 )). map ( str ). filter ()) == 3
Stream
의 방법은 또한 함수로 노출됩니다.
from streamable . functions import catch
inverse_integers : Iterator [ int ] = map ( lambda n : 1 / n , range ( 10 ))
safe_inverse_integers : Iterator [ int ] = catch ( inverse_integers , ZeroDivisionError )
프리 스레드 파이썬 3.13+ 빌드의 혜택, python -X gil=0
통해 실행됩니다.