Python 3对异步编程有一些惊人的支持,但可以说这使开发库变得更加困难。您是否厌倦了实施基本同样事情的同步和异步方法?这可能是您的简单解决方案。
pip install synchronicity
假设您具有异步功能
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
假设(无论出于何种原因)您想向用户提供同步API。例如,也许您想使在基本脚本中运行代码变得容易,或者用户正在构建大多数是CPU的东西,因此他们不想打扰Asyncio。
创建同步等效的一种“简单”方法是实现一组同步函数,在这些函数中,他们所做的就是在异步函数上称为asyncio.run。但这不是更复杂的代码的绝佳解决方案:
最后一个案例特别具有挑战性。例如,假设您正在将客户端实现到需要具有持久连接的数据库,并且您想在Asyncio:
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
您如何将同步接口公开此代码?问题在于,在asyncio.lun中包装connect
和query
不起作用,因为您需要跨呼叫保留事件循环。很明显,我们需要一些更先进的东西。
该库提供了一个简单的Synchronizer
类,该类在单独的线程上创建事件循环,并包装功能/生成器/类,以便在该线程上进行同步执行。当您调用任何内容时,它将检测到您是在同步或异步上下文中运行的,并相应地行事。
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
# Running f in a synchronous context blocks until the result is available
ret = f ( 42 ) # Blocks
print ( 'f(42) =' , ret )
async def g ():
# Running f in an asynchronous context works the normal way
ret = await f ( 42 )
print ( 'f(42) =' , ret )
装饰器还适用于发电机:
@ synchronizer . create_blocking
async def f ( n ):
for i in range ( n ):
await asyncio . sleep ( 1.0 )
yield i
# Note that the following runs in a synchronous context
# Each number will take 1s to print
for ret in f ( 10 ):
print ( ret )
它还通过包装课上的所有方法来在课程上运行:
@ synchronizer . create_blocking
class DBConnection :
def __init__ ( self , url ):
self . _url = url
async def connect ( self ):
self . _connection = await connect_to_database ( self . _url )
async def query ( self , q ):
return await self . _connection . run_query ( q )
# Now we can call it synchronously, if we want to
db_conn = DBConnection ( 'tcp://localhost:1234' )
db_conn . connect ()
data = db_conn . query ( 'select * from foo' )
您还可以通过在任何调用中添加_future=True
来使函数返回Future
对象。如果您想从阻塞上下文中派遣许多电话,这可能很有用,但是您想大致并行解决它们:
from synchronicity import Synchronizer
synchronizer = Synchronizer ()
@ synchronizer . create_blocking
async def f ( x ):
await asyncio . sleep ( 1.0 )
return x ** 2
futures = [ f ( i , _future = True ) for i in range ( 10 )] # This returns immediately
rets = [ fut . result () for fut in futures ] # This should take ~1s to run, resolving all futures in parallel
print ( 'first ten squares:' , rets )
如果您有多个事件循环,或者如果您有一些CPU结合的部分,或者您想在单独的线程上以安全安全性运行,则该库也可以在纯粹的异步设置,如果您有多个事件循环或某些关键代码中很有用。所有对同步函数/发电机的调用都是设计螺纹安全的。这使其成为简单事物的loop.run_in_executor的有用替代方法。但是请注意,每个同步器仅运行一个线程。
您可以像其他任何类一样同步上下文管理器类,并且特殊方法将得到正确处理。
还有一个函数装饰@synchronizer.asynccontextmanager
,其行为就像https://docs.python.org/3/library/contextlib.html.html#contextlib.asyncconconconconconconconconconconcontextmanager,但在同步和异音方面都起作用。
这是我从个人项目中介绍的代码,但没有进行战斗测试。有一个小型测试套件,您可以使用Pytest运行。
应该自动化这个...
release-XYZ
XYZ
git tag -a vX.YZ -m "* release bullets"
TWINE_USERNAME=__token__ TWINE_PASSWORD="$PYPI_TOKEN_SYNCHRONICITY" make publish