O Python 3 tem um apoio incrível para a programação assíncrona, mas sem dúvida tornou um pouco mais difícil desenvolver bibliotecas. Você está cansado de implementar métodos síncronos e assíncronos fazendo basicamente a mesma coisa? Esta pode ser uma solução simples para você.
pip install synchronicity
Digamos que você tenha uma função assíncrona
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
E digamos (por qualquer motivo) que você queira oferecer uma API síncrona aos usuários. Por exemplo, talvez você queira facilitar a execução de seu código em um script básico, ou um usuário está criando algo que é principalmente ligado à CPU, para que eles não queiram se preocupar com o Asycio.
Uma maneira "simples" de criar um equivalente síncrono seria implementar um conjunto de funções síncronas onde tudo o que eles fazem é chamar asyncio.run em uma função assíncrona. Mas esta não é uma ótima solução para um código mais complexo:
O último caso é particularmente desafiador. Por exemplo, digamos que você esteja implementando um cliente para um banco de dados que precisa ter uma conexão persistente e deseja construí -lo no Asycio:
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
Como você expõe uma interface síncrona a este código? O problema é que o embrulho connect
e query
no Asyncio.run não funcionará, pois você precisa preservar o loop de eventos nas chamadas . É claro que precisamos de algo um pouco mais avançado.
Esta biblioteca oferece uma classe Synchronizer
simples que cria um loop de eventos em um encadeamento separado e envolve funções/geradores/classes para que a execução síncrona aconteça nesse encadeamento. Quando você chama qualquer coisa, ele detectará se você estiver executando em um contexto síncrono ou assíncrono e se comportará correspondentemente.
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
# Running f in a synchronous context blocks until the result is available
ret = f ( 42 ) # Blocks
print ( 'f(42) =' , ret )
async def g ():
# Running f in an asynchronous context works the normal way
ret = await f ( 42 )
print ( 'f(42) =' , ret )
O decorador também funciona em geradores:
@ synchronizer . create_blocking
async def f ( n ):
for i in range ( n ):
await asyncio . sleep ( 1.0 )
yield i
# Note that the following runs in a synchronous context
# Each number will take 1s to print
for ret in f ( 10 ):
print ( ret )
Ele também opera em aulas envolvendo todos os métodos da classe:
@ synchronizer . create_blocking
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
# Now we can call it synchronously, if we want to
db_conn = DBConnection ( 'tcp://localhost:1234' )
db_conn . connect ()
data = db_conn . query ( 'select * from foo' )
Você também pode fazer com que as funções retornem um objeto Future
adicionando _future=True
a qualquer chamada. Isso pode ser útil se você deseja despachar muitas chamadas de um contexto de bloqueio, mas deseja resolvê -las aproximadamente em paralelo:
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
futures = [ f ( i , _future = True ) for i in range ( 10 )] # This returns immediately
rets = [ fut . result () for fut in futures ] # This should take ~1s to run, resolving all futures in parallel
print ( 'first ten squares:' , rets )
Esta biblioteca também pode ser útil em configurações puramente assíncronas, se você tiver vários loops de eventos ou se tiver alguma seção que seja ligada à CPU ou em algum código crítico que deseja executar em um thread separado para segurança. Todas as chamadas para funções/geradores sincronizados são seguros por threads por design. Isso o torna uma alternativa útil para loop.run_in_executor para coisas simples. Observe, no entanto, que cada sincronizador executa apenas um thread.
Você pode sincronizar as classes do Gerenciador de contexto como qualquer outra classe e os métodos especiais serão tratados corretamente.
Há também um decorador de funções @synchronizer.asynccontextmanager
que se comporta exatamente como https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanager, mas trabalha em contextos síncronos e síncronos e assonetos.
Este é o código que eu quebrei de projetos pessoais e não foi testado em batalha. Há uma pequena suíte de teste que você pode executar usando pytest.
Deve automatizar isso ...
release-XYZ
da MainXYZ
git tag -a vX.YZ -m "* release bullets"
TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_SYNCHRONICITY" make publish