请参阅坚韧文档以获得更好的体验。
Tenacity 是一个 Apache 2.0 许可的通用重试库,用 Python 编写,用于简化向任何事物添加重试行为的任务。它源自重试分叉,遗憾的是该分叉已不再维护。 Tenacity 与重试的 api 不兼容,但添加了重要的新功能并修复了许多长期存在的错误。
最简单的用例是每当发生异常时重试片状函数,直到返回值。
..测试代码:: 随机导入 从坚韧导入重试 @重试 def do_something_unreliable(): 如果 random.randint(0, 10) > 1: raise IOError("酱汁坏了,一切都被冲洗掉了!!!111one") 别的: 返回“很棒的酱汁!” 打印(do_something_unreliable())
..测试输出:: :隐藏: 很棒的酱汁!
.. 目录树:: :隐: :最大深度:2 变更日志 应用程序编程接口
要安装tenacity ,只需:
$ pip install tenacity
..测试设置:: 导入日志记录 # # 注意以下导入仅用于演示方便。 # 生产代码应始终显式导入所需的名称。 # 来自坚韧进口* 类 MyException(异常): 经过
正如您在上面看到的,默认行为是在引发异常时永远重试而不等待。
..测试代码:: @重试 def never_gonna_give_you_up(): print("永远重试,忽略异常,不要在重试之间等待") 引发异常
让我们少一些执着,设定一些界限,比如放弃之前的尝试次数。
..测试代码:: @重试(停止=尝试后停止(7)) def stop_after_7_attempts(): print("尝试7次后停止") 引发异常
我们没有一整天的时间,所以让我们为应该重试的时间设定一个界限。
..测试代码:: @重试(停止= stop_after_delay(10)) def stop_after_10_s(): print("10秒后停止") 引发异常
如果您的截止日期很紧,并且不能超过延迟时间,那么您可以在超过延迟时间之前放弃重试一次。
..测试代码:: @重试(停止= stop_before_delay(10)) def stop_before_10_s(): print("10 秒前停止 1 次尝试") 引发异常
您可以使用 | 组合多个停止条件操作员:
..测试代码:: @重试(停止=(stop_after_delay(10)| stop_after_attempt(5))) def stop_after_10_s_or_5_retries(): print("10秒或重试5次后停止") 引发异常
大多数事情不希望尽快轮询,因此我们在重试之间等待 2 秒。
..测试代码:: @重试(等待= wait_fixed(2)) def wait_2_s(): print("重试之间等待 2 秒") 引发异常
有些事情在注入一点随机性时表现最好。
..测试代码:: @重试(等待= wait_random(最小值= 1,最大值= 2)) def wait_random_1_to_2_s(): print("重试之间随机等待 1 到 2 秒") 引发异常
但话又说回来,在重试分布式服务和其他远程端点时,很难击败指数退避。
..测试代码:: @重试(等待= wait_exponential(乘数= 1,最小值= 4,最大值= 10)) def wait_exponential_1(): print("每次重试之间等待 2^x * 1 秒,从 4 秒开始,然后最多 10 秒,然后再等待 10 秒") 引发异常
话又说回来,在重试分布式服务和其他远程端点时,将固定等待和抖动(以帮助避免惊群)相结合也很难被击败。
..测试代码:: @retry(等待=wait_fixed(3) + wait_random(0, 2)) def wait_fixed_jitter(): print("等待至少3秒,并添加最多2秒的随机延迟") 引发异常
当多个进程争夺共享资源时,呈指数增长的抖动有助于最大限度地减少冲突。
..测试代码:: @重试(等待= wait_random_exponential(乘数= 1,最大值= 60)) def wait_exponential_jitter(): print("每次重试之间随机等待最多 2^x * 1 秒,直到范围达到 60 秒,然后随机等待 60 秒") 引发异常
有时有必要建立一个退避链。
..测试代码:: @retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] + [wait_fixed(7) for i in range(2)] + [等待固定(9)])) def wait_fixed_chained(): print("3 次尝试等待 3 秒,接下来的 2 次尝试等待 7 秒,此后的所有尝试等待 9 秒") 引发异常
我们有几种选择来处理引发特定或一般异常的重试,就像这里的情况一样。
..测试代码:: 类客户端错误(异常): """某种类型的客户端错误。""" @retry(重试=retry_if_exception_type(IOError)) def might_io_error(): print("如果发生 IOError,则永远重试,无需等待,引发任何其他错误") 引发异常 @retry(重试=retry_if_not_exception_type(ClientError)) def might_client_error(): print("如果发生除 ClientError 之外的任何错误,则永远重试,无需等待。立即引发 ClientError。") 引发异常
我们还可以使用函数的结果来改变重试的行为。
..测试代码:: def is_none_p(值): """如果值为 None,则返回 True""" 返回值为无 @retry(重试=retry_if_result(is_none_p)) def might_return_none(): print("如果返回值为 None,则不等待重试")
另请参阅这些方法:
..测试代码:: 如果异常则重试 如果异常类型则重试 如果不异常则重试类型 重试除非异常类型 如果结果重试 如果没有结果则重试 重试如果_异常_消息 如果不异常则重试消息 重试任意 全部重试
我们还可以组合几个条件:
..测试代码:: def is_none_p(值): """如果值为 None,则返回 True""" 返回值为无 @retry(重试=(retry_if_result(is_none_p) | retry_if_exception_type())) def might_return_none(): print("如果返回值为 None,则永远重试,忽略异常,无需等待")
还支持停止、等待等任意组合,让您可以自由地混合和匹配。
还可以通过引发 TryAgain 异常来随时显式重试:
..测试代码:: @重试 def do_something(): 结果 = some_else() 如果结果 == 23: 提出再试一次
通常,当您的函数最后一次失败(并且不会根据您的设置再次重试)时,会引发 RetryError。您的代码遇到的异常将显示在堆栈跟踪中间的某个位置。
如果您希望在堆栈跟踪末尾(最明显的位置)看到代码遇到的异常,则可以设置 reraise=True。
..测试代码:: @retry(reraise=True, stop=stop_after_attempt(3)) def raise_my_exception(): 引发 MyException(“失败”) 尝试: 引发我的异常() 除了我的异常: # 重试超时 经过
通过使用 before 回调函数,可以在尝试调用该函数之前执行一个操作:
..测试代码:: 导入日志记录 导入系统 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) 记录器=logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before=before_log(logger,logging.DEBUG)) def raise_my_exception(): 引发 MyException(“失败”)
本着同样的精神,可以在调用失败后执行:
..测试代码:: 导入日志记录 导入系统 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) 记录器=logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), after=after_log(logger,logging.DEBUG)) def raise_my_exception(): 引发 MyException(“失败”)
也可以只记录将要重试的失败。通常重试发生在等待间隔之后,因此关键字参数称为before_sleep
:
..测试代码:: 导入日志记录 导入系统 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) 记录器=logging.getLogger(__name__) @重试(停止=尝试后停止(3), before_sleep = before_sleep_log(记录器,logging.DEBUG)) def raise_my_exception(): 引发 MyException(“失败”)
您可以使用函数附加的统计信息属性来访问有关函数重试的统计信息:
..测试代码:: @重试(停止=尝试后停止(3)) def raise_my_exception(): 引发 MyException(“失败”) 尝试: 引发我的异常() 除了例外: 经过 打印(raise_my_exception.statistics)
..测试输出:: :隐藏: ...
您还可以定义自己的回调。回调应该接受一个名为retry_state
的参数,其中包含有关当前重试调用的所有信息。
例如,您可以在所有重试失败后调用自定义回调函数,而不会引发异常(或者您可以重新引发或执行任何操作)
..测试代码:: def return_last_value(重试状态): """返回最后一次调用尝试的结果""" 返回 retry_state.outcome.result() def is_false(值): """如果值为 False,则返回 True""" 返回值为False # 尝试3次后得到不同的结果将返回False @重试(停止=尝试后停止(3), retry_error_callback=return_last_value, 重试=retry_if_result(is_false)) def Final_return_false(): 返回错误
retry_state
参数是:class:`~tenacity.RetryCallState`类的对象。
还可以为其他关键字参数定义自定义回调。
.. 函数:: my_stop(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息 :return: 重试是否应该停止 :r 类型:布尔值
.. 函数:: my_wait(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息 :return: 下次重试之前等待的秒数 :r类型:浮点数
.. 函数:: my_retry(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息 :return: 是否继续重试 :r 类型:布尔值
.. 函数:: my_before(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息
.. 函数:: my_after(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息
.. 函数:: my_before_sleep(retry_state) :param RetryCallState retry_state: 有关当前重试调用的信息
这是一个自定义before_sleep
函数的示例:
..测试代码:: 导入日志记录 logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) 记录器=logging.getLogger(__name__) def my_before_sleep(retry_state): 如果 retry_state.attempt_number < 1: 日志级别 = 日志记录.INFO 别的: 日志级别 = 日志记录.警告 记录器.log( loglevel, '重试 %s: 尝试 %s 结束于: %s', retry_state.fn、retry_state.attempt_number、retry_state.outcome) @retry(stop=stop_after_attempt(3), before_sleep=my_before_sleep) def raise_my_exception(): 引发 MyException(“失败”) 尝试: 引发我的异常() 除了重试错误: 经过
您可以在调用重试装饰器时根据需要更改其参数,方法是使用附加到包装函数的 retry_with 函数:
..测试代码:: @重试(停止=尝试后停止(3)) def raise_my_exception(): 引发 MyException(“失败”) 尝试: raise_my_exception.retry_with(stop=stop_after_attempt(4))() 除了例外: 经过 打印(raise_my_exception.statistics)
..测试输出:: :隐藏: ...
如果你想使用变量来设置重试参数,则不必使用重试装饰器 - 你可以直接使用 Retrying:
..测试代码:: def never_good_enough(arg1): 引发异常('无效参数:{}'.format(arg1)) def try_never_good_enough(max_attempts=3): 重试器 = 重试(stop=stop_after_attempt(max_attempts), reraise=True) retryer(never_good_enough, '我真的尝试过')
您可能还想暂时更改修饰函数的行为,例如在测试中以避免不必要的等待时间。您可以修改/修补附加到函数的重试属性。请记住,这是一个只写属性,统计信息应该从函数统计属性中读取。
..测试代码:: @retry(stop=stop_after_attempt(3), wait=wait_fixed(3)) def raise_my_exception(): 引发 MyException(“失败”) 从单元测试导入模拟 与mock.patch.object(raise_my_exception.retry,“等待”,wait_fixed(0)): 尝试: 引发我的异常() 除了例外: 经过 打印(raise_my_exception.statistics)
..测试输出:: :隐藏: ...
Tenacity 允许您重试代码块,而无需将其包装在独立的函数中。这使得在共享上下文时可以轻松隔离故障块。诀窍是将 for 循环和上下文管理器结合起来。
..测试代码:: from tenacity import 重试、重试错误、stop_after_attempt 尝试: 对于重试的尝试(stop=stop_after_attempt(3)): 尝试: 引发异常('我的代码失败!') 除了重试错误: 经过
您可以通过配置 Retrying 对象来配置重试策略的各个细节。
对于异步代码,您可以使用 AsyncRetrying。
..测试代码:: from tenacity import AsyncRetrying、RetryError、stop_after_attempt 异步定义函数(): 尝试: AsyncRetrying 中的尝试异步(stop=stop_after_attempt(3)): 尝试: 引发异常('我的代码失败!') 除了重试错误: 经过
在这两种情况下,您可能希望将结果设置为尝试,以便在retry_if_result
等重试策略中可用。这可以通过访问retry_state
属性来完成:
..测试代码:: 从坚韧导入AsyncRetrying,retry_if_result 异步定义函数(): 异步尝试 AsyncRetrying(retry=retry_if_result(lambda x: x < 3)): 尝试: result = 1 # 一些复杂的计算、函数调用等。 如果不是尝试.retry_state.outcome.failed: attempts.retry_state.set_result(结果) 返回结果
最后, retry
也适用于 asyncio、Trio 和 Tornado (>= 4.5) 协程。睡眠也是异步完成的。
@ retry
async def my_asyncio_function ( loop ):
await loop . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
async def my_async_trio_function ():
await trio . socket . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
@ tornado . gen . coroutine
def my_async_tornado_function ( http_client , url ):
yield http_client . fetch ( url )
您甚至可以通过传递正确的睡眠函数来使用替代事件循环,例如 curio:
@ retry ( sleep = curio . sleep )
async def my_async_curio_function ():
await asks . get ( 'https://example.org' )
reno 用于管理变更日志。看看他们的使用文档。
文档生成将自动编译变更日志。您只需添加它们即可。
# Opens a template file in an editor
tox -e reno -- new some-slug-for-my-change --edit