Python 3 a un support incroyable pour la programmation asynchrone, mais il est sans doute rendu un peu plus difficile à développer des bibliothèques. Êtes-vous fatigué de mettre en œuvre des méthodes synchrones et asynchrones faisant essentiellement la même chose? Cela pourrait être une solution simple pour vous.
pip install synchronicity
Disons que vous avez une fonction asynchrone
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
Et disons (pour une raison quelconque) vous souhaitez offrir une API synchrone aux utilisateurs. Par exemple, vous souhaitez peut-être faciliter l'exécution de votre code dans un script de base, ou un utilisateur construit quelque chose qui est principalement lié au processeur, donc il ne veut pas se soucier d'Asyncio.
Un moyen "simple" de créer un équivalent synchrone consisterait à implémenter un ensemble de fonctions synchrones où tout ce qu'ils font est d'appeler asyncio.run sur une fonction asynchrone. Mais ce n'est pas une excellente solution pour un code plus complexe:
Le dernier cas est particulièrement difficile. Par exemple, disons que vous implémentez un client dans une base de données qui doit avoir une connexion persistante, et que vous souhaitez le construire dans Asyncio:
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
Comment exposer une interface synchrone à ce code? Le problème est que l'enveloppement connect
and query
dans asyncio.run ne fonctionnera pas car vous devez préserver la boucle d'événement entre les appels . Il est clair que nous avons besoin de quelque chose de légèrement plus avancé.
Cette bibliothèque propose une classe Synchronizer
simple qui crée une boucle d'événement sur un thread séparé et enveloppe les fonctions / générateurs / classes afin que l'exécution synchrone se produise sur ce thread. Lorsque vous appelez quoi que ce soit, il détectera si vous courez dans un contexte synchrone ou asynchrone et que vous vous comportez en conséquence.
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
# Running f in a synchronous context blocks until the result is available
ret = f ( 42 ) # Blocks
print ( 'f(42) =' , ret )
async def g ():
# Running f in an asynchronous context works the normal way
ret = await f ( 42 )
print ( 'f(42) =' , ret )
Le décorateur travaille également sur les générateurs:
@ synchronizer . create_blocking
async def f ( n ):
for i in range ( n ):
await asyncio . sleep ( 1.0 )
yield i
# Note that the following runs in a synchronous context
# Each number will take 1s to print
for ret in f ( 10 ):
print ( ret )
Il fonctionne également sur les classes en emballage chaque méthode de la classe:
@ synchronizer . create_blocking
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
# Now we can call it synchronously, if we want to
db_conn = DBConnection ( 'tcp://localhost:1234' )
db_conn . connect ()
data = db_conn . query ( 'select * from foo' )
Vous pouvez également faire en sorte que les fonctions renvoient un Future
objet en ajoutant _future=True
à n'importe quel appel. Cela peut être utile si vous souhaitez envoyer de nombreux appels à partir d'un contexte de blocage, mais que vous souhaitez les résoudre à peu près en parallèle:
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
futures = [ f ( i , _future = True ) for i in range ( 10 )] # This returns immediately
rets = [ fut . result () for fut in futures ] # This should take ~1s to run, resolving all futures in parallel
print ( 'first ten squares:' , rets )
Cette bibliothèque peut également être utile dans des paramètres purement asynchrones, si vous avez plusieurs boucles d'événements, ou si vous avez une section liée au processeur, ou un code critique que vous souhaitez exécuter sur un fil séparé pour la sécurité. Tous les appels à des fonctions / générateurs synchronisés sont en file d'infiltration par conception. Cela en fait une alternative utile à loop.run_in_executor pour des choses simples. Notez cependant que chaque synchroniseur ne fait qu'un seul fil.
Vous pouvez synchroniser les classes de gestion de contexte comme n'importe quelle autre classe et les méthodes spéciales seront gérées correctement.
Il y a aussi un décorateur de fonction @synchronizer.asynccontextmanager
qui se comporte comme https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextManager mais fonctionne dans des contextes synchrones et asynchrones.
C'est du code que j'ai éclaté de projets personnels, et il n'a pas été testé au combat. Il y a une petite suite de test que vous pouvez exécuter à l'aide de PyTest.
Devrait automatiser cela ...
release-XYZ
de la mainXYZ
git tag -a vX.YZ -m "* release bullets"
TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_SYNCHRONICITY" make publish