Python 3 tiene un soporte sorprendente para la programación de Async, pero posiblemente sea un poco más difícil desarrollar bibliotecas. ¿Está cansado de implementar métodos sincrónicos y asincrónicos que hacen básicamente lo mismo? Esta podría ser una solución simple para usted.
pip install synchronicity
Digamos que tienes una función asincrónica
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
Y digamos (por cualquier razón), desea ofrecer una API sincrónica a los usuarios. Por ejemplo, tal vez desee facilitar ejecutar su código en un script básico, o un usuario está construyendo algo que esté en su mayoría unido a CPU, por lo que no quieren molestarse con Asyncio.
Una forma "simple" de crear un equivalente sincrónico sería implementar un conjunto de funciones sincrónicas donde todo lo que hacen es llamar a Asyncio.Run en una función asíncrona. Pero esta no es una gran solución para un código más complejo:
El último caso es particularmente desafiante. Por ejemplo, supongamos que está implementando un cliente en una base de datos que necesita tener una conexión persistente, y desea construirlo en Asyncio:
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
¿Cómo se expone una interfaz sincrónica a este código? El problema es que connect
y query
en asyncio.run no funcionarán ya que necesita preservar el bucle de eventos en las llamadas . Está claro que necesitamos algo un poco más avanzado.
Esta biblioteca ofrece una clase Synchronizer
simple que crea un bucle de evento en un hilo separado y envuelve funciones/generadores/clases para que la ejecución síncrona ocurra en ese hilo. Cuando llame a algo, detectará si se está ejecutando en un contexto sincrónico o asincrónico, y se comporta correspondientemente.
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
# Running f in a synchronous context blocks until the result is available
ret = f ( 42 ) # Blocks
print ( 'f(42) =' , ret )
async def g ():
# Running f in an asynchronous context works the normal way
ret = await f ( 42 )
print ( 'f(42) =' , ret )
El decorador también funciona en generadores:
@ synchronizer . create_blocking
async def f ( n ):
for i in range ( n ):
await asyncio . sleep ( 1.0 )
yield i
# Note that the following runs in a synchronous context
# Each number will take 1s to print
for ret in f ( 10 ):
print ( ret )
También opera en clases envolviendo todos los métodos de la clase:
@ synchronizer . create_blocking
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
# Now we can call it synchronously, if we want to
db_conn = DBConnection ( 'tcp://localhost:1234' )
db_conn . connect ()
data = db_conn . query ( 'select * from foo' )
También puede hacer que las funciones devuelvan un objeto Future
agregando _future=True
a cualquier llamada. Esto puede ser útil si desea enviar muchas llamadas desde un contexto de bloqueo, pero desea resolverlas aproximadamente en paralelo:
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
futures = [ f ( i , _future = True ) for i in range ( 10 )] # This returns immediately
rets = [ fut . result () for fut in futures ] # This should take ~1s to run, resolving all futures in parallel
print ( 'first ten squares:' , rets )
Esta biblioteca también puede ser útil en configuraciones puramente asincrónicas, si tiene múltiples bucles de eventos, o si tiene alguna sección que está unida a CPU, o algún código crítico que desea ejecutar en un hilo separado para la seguridad. Todas las llamadas a las funciones/generadores sincronizados son seguras de hilo por diseño. Esto lo convierte en una alternativa útil a loop.run_in_executor para cosas simples. Sin embargo, tenga en cuenta que cada sincronizador solo ejecuta un hilo.
Puede sincronizar las clases de Context Manager al igual que cualquier otra clase y los métodos especiales se manejarán correctamente.
También hay un decorador de funciones @synchronizer.asynccontextmanager
que se comporta al igual que https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanager pero funciona en contextos sincronales y asíncronos.
Este es un código que salí de los proyectos personales, y no se ha probado la batalla. Hay una pequeña suite de prueba que puedes ejecutar usando Pytest.
Debería automatizar esto ...
release-XYZ
de mainXYZ
git tag -a vX.YZ -m "* release bullets"
TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_SYNCHRONICITY" make publish