より良いエクスペリエンスを得るには、テナシティのドキュメントを参照してください。
Tenacity は、Python で書かれた Apache 2.0 ライセンスの汎用再試行ライブラリで、ほぼあらゆるものに再試行動作を追加するタスクを簡素化します。これは再試行のフォークに由来しますが、残念ながら現在は維持されていません。 Tenacity は再試行と互換性のある API ではありませんが、重要な新機能が追加され、長年のバグが修正されています。
最も単純な使用例は、例外が発生するたびに、値が返されるまで不安定な関数を再試行することです。
.. テストコード:: ランダムにインポート テナシティインポートの再試行から @リトライ def do_something_unreliable(): ランダム.randint(0, 10) > 1の場合: raise IOError("壊れたソース、すべてがホースで覆われています!!!111one") それ以外: 返す「ソースがすごい!」 print(do_something_unreliable())
.. テスト出力:: :隠れる: すごいソース!
..toctree:: :隠れた: :最大深さ: 2 変更履歴 API
Tenacityをインストールするには、次のようにします。
$ pip install tenacity
.. テストセットアップ:: インポートログ # # 次のインポートはデモンストレーションの便宜のためにのみ使用されていることに注意してください。 # 実稼働コードでは、必要な名前を常に明示的にインポートする必要があります。 # 粘り強いインポートから* クラスMyException(例外): 合格
上で見たように、デフォルトの動作では、例外が発生したときに待機せずに永久に再試行します。
.. テストコード:: @リトライ def Never_gonna_give_you_up(): print("例外を無視して永久に再試行します。再試行の間に待機しません") 例外を発生させる
もう少し固執を減らして、諦めるまでの試行回数など、いくつかの境界線を設定しましょう。
.. テストコード:: @retry(stop=stop_after_attempt(7)) def stop_after_7_attempts(): print("7回試行後に停止します") 例外を発生させる
一日中は時間がないので、再試行する時間の境界を設定しましょう。
.. テストコード:: @retry(stop=stop_after_lay(10)) def stop_after_10_s(): print("10秒後に停止します") 例外を発生させる
期限が迫っており、遅延時間を超過することが問題である場合は、遅延時間を超過する 1 回前に再試行を諦めることができます。
.. テストコード:: @retry(stop=stop_before_delay(10)) def stop_before_10_s(): print("10 秒前に 1 回の試行を停止します") 例外を発生させる
| を使用して、複数の停止条件を組み合わせることができます。オペレーター:
.. テストコード:: @retry(stop=(stop_after_lay(10) | stop_after_attempt(5))) def stop_after_10_s_or_5_retries(): print("10 秒後または 5 回の再試行後に停止します") 例外を発生させる
ほとんどの場合、できるだけ早くポーリングされることは望ましくありません。そのため、再試行の間に 2 秒間待機するようにしましょう。
.. テストコード:: @retry(wait=wait_fixed(2)) def wait_2_s(): print("再試行の間に 2 秒待ちます") 例外を発生させる
一部のものには、少しランダム性を注入すると最高のパフォーマンスが得られます。
.. テストコード:: @retry(wait=wait_random(min=1, max=2)) def wait_random_1_to_2_s(): print("再試行の間にランダムに 1 ~ 2 秒待機します") 例外を発生させる
ただし、分散サービスや他のリモート エンドポイントを再試行する際の指数関数的なバックオフを克服するのは困難です。
.. テストコード:: @retry(wait=wait_exponential(乗数=1、最小=4、最大=10)) def wait_exponential_1(): print("各再試行の間に 2^x * 1 秒待機します。最初は 4 秒、その後は最大 10 秒、その後は 10 秒") 例外を発生させる
繰り返しになりますが、分散サービスや他のリモート エンドポイントを再試行するときに、固定待機とジッター (雷の群れを避けるために) を組み合わせることも困難です。
.. テストコード:: @retry(wait=wait_fixed(3) + wait_random(0, 2)) def wait_fixed_jitter(): print("少なくとも 3 秒待って、最大 2 秒のランダムな遅延を追加します") 例外を発生させる
複数のプロセスが共有リソースをめぐって競合している場合、ジッターが指数関数的に増加すると、衝突を最小限に抑えることができます。
.. テストコード:: @retry(wait=wait_random_exponential(multiplier=1, max=60)) def wait_exponential_jitter(): print("範囲が 60 秒に達するまで、各再試行の間にランダムに最大 2^x * 1 秒待機し、その後はランダムに最大 60 秒待機します") 例外を発生させる
場合によっては、一連のバックオフを構築する必要があります。
.. テストコード:: @retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] + [範囲(2)のiのwait_fixed(7)] + [wait_fixed(9)])) def wait_fixed_chained(): print("3 回の試行で 3 秒、次の 2 回の試行で 7 秒、その後のすべての試行で 9 秒待ちます") 例外を発生させる
ここのケースのように、特定または一般的な例外を引き起こす再試行に対処するためのオプションがいくつかあります。
.. テストコード:: クラス ClientError(例外): """何らかのクライアント エラーです。""" @retry(retry=retry_if_Exception_type(IOError)) def might_io_error(): print("IOError が発生した場合は待機せずに永久に再試行し、他のエラーが発生した場合は") 例外を発生させる @retry(retry=retry_if_not_Exception_type(ClientError)) def might_client_error(): print("ClientError以外のエラーが発生した場合は待ち時間なしで永久に再試行します。直ちにClientErrorを発生させます。") 例外を発生させる
関数の結果を使用して、再試行の動作を変更することもできます。
.. テストコード:: def is_none_p(値): """値が None の場合は True を返します""" 戻り値はNoneです @retry(retry=retry_if_result(is_none_p)) def might_return_none(): print("戻り値がNoneの場合は待機せずに再試行します")
以下の方法も参照してください。
.. テストコード:: 例外の再試行 retry_if_Exception_type retry_if_not_Exception_type retry_unless_Exception_type 結果の再試行 retry_if_not_result 例外メッセージの再試行 retry_if_not_例外メッセージ 再試行_任意 全て再試行
いくつかの条件を組み合わせることもできます。
.. テストコード:: def is_none_p(値): """値が None の場合は True を返します""" 戻り値はNoneです @retry(retry=(retry_if_result(is_none_p) | retry_if_Exception_type())) def might_return_none(): print("戻り値が None の場合、待機せずに例外を無視して永久に再試行します")
停止、待機などの任意の組み合わせもサポートされており、自由に組み合わせることができます。
TryAgain 例外を発生させることで、いつでも明示的に再試行することもできます。
.. テストコード:: @リトライ def do_something(): 結果 = something_else() 結果 == 23 の場合: TryAgain を発生させる
通常、関数が最後に失敗すると (設定に基づいて再試行されません)、RetryError が発生します。コードで発生した例外は、スタック トレースの途中に表示されます。
コードで発生した例外をスタック トレースの最後(最も目立つ場所) で確認したい場合は、reraise=True を設定できます。
.. テストコード:: @retry(reraise=True、stop=stop_after_attempt(3)) def raise_my_Exception(): raise MyException("失敗") 試す: raise_my_Exception() MyException を除く: # 再試行でタイムアウトになりました 合格
before コールバック関数を使用すると、関数の呼び出しを試みる前にアクションを実行できます。
.. テストコード:: インポートログ インポートシステム logging.basicConfig(stream=sys.stderr、level=logging.DEBUG) ロガー =logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before=before_log(logger,logging.DEBUG)) def raise_my_Exception(): raise MyException("失敗")
同じ精神で、呼び出しが失敗した後に実行することも可能です。
.. テストコード:: インポートログ インポートシステム logging.basicConfig(stream=sys.stderr、level=logging.DEBUG) ロガー =logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), after=after_log(logger,logging.DEBUG)) def raise_my_Exception(): raise MyException("失敗")
再試行される失敗のみをログに記録することもできます。通常、再試行は待機間隔の後に行われるため、キーワード引数はbefore_sleep
と呼ばれます。
.. テストコード:: インポートログ インポートシステム logging.basicConfig(stream=sys.stderr、level=logging.DEBUG) ロガー =logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before_sleep=before_sleep_log(ロガー、ロギング.DEBUG)) def raise_my_Exception(): raise MyException("失敗")
関数に付加された統計属性を使用すると、関数に対して行われた再試行に関する統計にアクセスできます。
.. テストコード:: @retry(stop=stop_after_attempt(3)) def raise_my_Exception(): raise MyException("失敗") 試す: raise_my_Exception() 例外を除く: 合格 print(raise_my_Exception.statistics)
.. テスト出力:: :隠れる: ...
独自のコールバックを定義することもできます。コールバックは、現在の再試行呼び出しに関するすべての情報を含むretry_state
という 1 つのパラメーターを受け入れる必要があります。
たとえば、すべての再試行が失敗した後、例外を発生させずにカスタム コールバック関数を呼び出すことができます (あるいは、再発生させたり、実際に何かを行うこともできます)。
.. テストコード:: def return_last_value(retry_state): """最後の通話試行の結果を返します""" retry_state.outcome.result() を返す def is_false(値): """値が False の場合は True を返します""" 戻り値はFalseです # 異なる結果を得るために 3 回試行すると False が返されます @retry(stop=stop_after_attempt(3), retry_error_callback=return_last_value、 retry=retry_if_result(is_false)) def やがて_return_false(): Falseを返す
retry_state
引数は、:class:`~tenacity.RetryCallState`クラスのオブジェクトです。
他のキーワード引数に対してカスタム コールバックを定義することもできます。
.. 関数:: my_stop(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報 :return: 再試行を停止するかどうか :rtype: ブール
.. 関数:: my_wait(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報 :return: 次の再試行までに待機する秒数 :rtype: float
.. 関数:: my_retry(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報 :return: 再試行を続行するかどうか :rtype: ブール値
.. 関数:: my_before(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報
.. 関数:: my_after(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報
.. 関数:: my_before_sleep(retry_state) :param RetryCallState retry_state: 現在の再試行呼び出しに関する情報
カスタムbefore_sleep
関数の例を次に示します。
.. テストコード:: インポートログ logging.basicConfig(stream=sys.stderr、level=logging.DEBUG) ロガー =logging.getLogger(__name__) def my_before_sleep(retry_state): retry_state.attempt_number < 1 の場合: ログレベル = ロギング.INFO それ以外: ログレベル = ロギング.警告 ロガー.log( ログレベル、'%s を再試行中: %s の試行は次で終了しました: %s', retry_state.fn、retry_state.attempt_number、retry_state.outcome) @retry(stop=stop_after_attempt(3), before_sleep=my_before_sleep) def raise_my_Exception(): raise MyException("失敗") 試す: raise_my_Exception() RetryError を除く: 合格
ラップされた関数にアタッチされた retry_with 関数を使用して、再試行デコレータを呼び出すときに、必要に応じてその引数を変更できます。
.. テストコード:: @retry(stop=stop_after_attempt(3)) def raise_my_Exception(): raise MyException("失敗") 試す: raise_my_Exception.retry_with(stop=stop_after_attempt(4))() 例外を除く: 合格 print(raise_my_Exception.statistics)
.. テスト出力:: :隠れる: ...
変数を使用して再試行パラメーターを設定する場合は、再試行デコレーターを使用する必要はありません。代わりに Retrying を直接使用できます。
.. テストコード:: def Never_good_enough(arg1): raise Exception('無効な引数: {}'.format(arg1)) def try_never_good_enough(max_attempts=3): retryer = 再試行中(stop=stop_after_attempts(max_attempts), reraise=True) retryer(never_good_enough、「本当にやってみます」)
不必要な待ち時間を避けるために、テストなどで装飾された関数の動作を一時的に変更することもできます。関数に付加された再試行属性を変更/パッチすることができます。これは書き込み専用の属性であることに注意してください。統計は関数の統計属性から読み取る必要があります。
.. テストコード:: @retry(stop=stop_after_attempt(3), wait=wait_fixed(3)) def raise_my_Exception(): raise MyException("失敗") 単体テストインポートモックから モック.patch.object(raise_my_Exception.retry, "wait", wait_fixed(0)) を使用: 試す: raise_my_Exception() 例外を除く: 合格 print(raise_my_Exception.statistics)
.. テスト出力:: :隠れる: ...
Tenacity を使用すると、コード ブロックを独立した関数でラップすることなく、コード ブロックを再試行できます。これにより、コンテキストを共有しながら、障害のあるブロックを簡単に分離できます。秘訣は、for ループとコンテキスト マネージャーを組み合わせることです。
.. テストコード:: from tenacity import 再試行、RetryError、stop_after_attempt 試す: 再試行中の試行(stop=stop_after_attempt(3)): 試してみると: raise Exception('コードが失敗しています!') RetryError を除く: 合格
再試行オブジェクトを構成することで、再試行ポリシーの詳細をすべて構成できます。
非同期コードでは、AsyncRetrying を使用できます。
.. テストコード:: Tenacity import AsyncRetrying、RetryError、stop_after_attempt から 非同期定義関数(): 試す: AsyncRetrying(stop=stop_after_attempt(3)) での試行のための非同期: 試してみると: raise Exception('コードが失敗しています!') RetryError を除く: 合格
どちらの場合も、結果を試行に設定して、 retry_if_result
などの再試行戦略で使用できるようにすることができます。これは、 retry_state
プロパティにアクセスすることで実行できます。
.. テストコード:: Tenacity import AsyncRetrying から、retry_if_result 非同期定義関数(): AsyncRetrying(retry=retry_if_result(lambda x: x < 3)) での試行のための async: 試してみると: result = 1 # 複雑な計算、関数呼び出しなど。 そうでない場合は、attempt.retry_state.outcome.failed: 試行.retry_state.set_result(結果) 結果を返す
最後に、 retry
asyncio、Trio、Tornado (>= 4.5) コルーチンでも機能します。スリープも非同期で行われます。
@ retry
async def my_asyncio_function ( loop ):
await loop . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
async def my_async_trio_function ():
await trio . socket . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
@ tornado . gen . coroutine
def my_async_tornado_function ( http_client , url ):
yield http_client . fetch ( url )
正しい sleep 関数を渡すことで、curio などの代替イベント ループを使用することもできます。
@ retry ( sleep = curio . sleep )
async def my_async_curio_function ():
await asks . get ( 'https://example.org' )
reno は変更ログの管理に使用されます。使用法に関するドキュメントをご覧ください。
ドキュメントの生成により、変更ログが自動的にコンパイルされます。それらを追加するだけです。
# Opens a template file in an editor
tox -e reno -- new some-slug-for-my-change --edit