Python 3には、Asyncプログラミングに対する素晴らしいサポートがありますが、ライブラリを開発するのが少し難しくなっています。基本的に同じことを行う同期および非同期の方法を実装するのにうんざりしていませんか?これはあなたにとって簡単な解決策かもしれません。
pip install synchronicity
非同期関数があるとしましょう
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
そして、(何らかの理由で)ユーザーに同期APIを提供したいとしましょう。たとえば、基本的なスクリプトでコードを簡単に実行できるか、ユーザーがほとんどCPUバウンドのものを構築しているため、Asyncioを気にしたくないかもしれません。
同期等価物を作成する「シンプルな」方法は、非同期関数でAsyncio.runを呼び出すことだけである一連の同期関数を実装することです。しかし、これはより複雑なコードにとって素晴らしいソリューションではありません:
最後のケースは特に挑戦的です。たとえば、永続的な接続が必要なデータベースにクライアントを実装しているとし、Asyncioで構築したいとします。
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
このコードに同期インターフェイスをどのように公開しますか?問題は、コール全体にイベントループを保存する必要があるため、asyncio.runでconnect
とquery
ラッピングすることは機能しないことです。少し高度なものが必要であることは明らかです。
このライブラリは、個別のスレッドでイベントループを作成するシンプルなSynchronizer
クラスを提供し、そのスレッドで同期実行が行われるように関数/ジェネレーター/クラスをラップします。何かを呼び出すと、同期または非同期のコンテキストで実行され、それに応じて動作するかどうかが検出されます。
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
# Running f in a synchronous context blocks until the result is available
ret = f ( 42 ) # Blocks
print ( 'f(42) =' , ret )
async def g ():
# Running f in an asynchronous context works the normal way
ret = await f ( 42 )
print ( 'f(42) =' , ret )
デコレータは発電機でも動作します。
@ synchronizer . create_blocking
async def f ( n ):
for i in range ( n ):
await asyncio . sleep ( 1.0 )
yield i
# Note that the following runs in a synchronous context
# Each number will take 1s to print
for ret in f ( 10 ):
print ( ret )
また、クラス上のすべての方法をラッピングすることにより、クラスで動作します。
@ synchronizer . create_blocking
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
# Now we can call it synchronously, if we want to
db_conn = DBConnection ( 'tcp://localhost:1234' )
db_conn . connect ()
data = db_conn . query ( 'select * from foo' )
また、 _future=True
任意の呼び出しに追加することにより、futureオブジェクトをFuture
オブジェクトに返すようにすることもできます。これは、ブロッキングコンテキストから多くの呼び出しを派遣する場合に役立ちますが、それらを大まかに並行して解決する必要があります。
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
futures = [ f ( i , _future = True ) for i in range ( 10 )] # This returns immediately
rets = [ fut . result () for fut in futures ] # This should take ~1s to run, resolving all futures in parallel
print ( 'first ten squares:' , rets )
このライブラリは、複数のイベントループがある場合、またはCPUバインドのセクションがある場合、または安全のために別のスレッドで実行したい重要なコードがある場合、純粋に非同期設定でも役立ちます。同期された関数/ジェネレーターへのすべての呼び出しは、設計によりスレッドセーフです。これにより、単純なもののためのloop.run_in_executorの便利な代替手段になります。ただし、各シンクロナイザーは1つのスレッドのみを実行することに注意してください。
他のクラスと同様に、コンテキストマネージャーのクラスを同期することができ、特別な方法は適切に処理されます。
また、https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanagerのように振る舞う@synchronizer.asynccontextmanager
も機能します。
これは私が個人的なプロジェクトから抜け出したコードであり、戦闘テストされていません。 Pytestを使用して実行できる小さなテストスイートがあります。
これを自動化する必要があります...
release-XYZ
作成しますXYZ
git tag -a vX.YZ -m "* release bullets"
TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_SYNCHRONICITY" make publish