Veuillez vous référer à la documentation sur la ténacité pour une meilleure expérience.
Tenacity est une bibliothèque de nouvelle tentative sous licence Apache 2.0, écrite en Python, pour simplifier la tâche d'ajout d'un comportement de nouvelle tentative à presque tout. Cela provient d’un fork de réessais qui n’est malheureusement plus maintenu. Tenacity n'est pas compatible avec les nouvelles tentatives, mais ajoute de nouvelles fonctionnalités importantes et corrige un certain nombre de bugs de longue date.
Le cas d'utilisation le plus simple consiste à réessayer une fonction irrégulière chaque fois qu'une exception se produit jusqu'à ce qu'une valeur soit renvoyée.
.. code de test :: importer au hasard à partir de la nouvelle tentative d'importation de ténacité @réessayer def do_something_unreliable() : si random.randint(0, 10) > 1 : raise IOError("Sauce cassée, tout est arrosé !!!111one") autre: return "Sauce géniale!" print(do_something_unreliable())
.. sortie de test :: :cacher: Super sauce!
.. toctree :: :caché: :profondeur maximale : 2 journal des modifications API
Pour installer Tenacity , il suffit de :
$ pip install tenacity
.. configuration de test :: journalisation des importations # # Notez que l'importation suivante est utilisée uniquement à des fins de démonstration. # Le code de production doit toujours importer explicitement les noms dont il a besoin. # de l'importation de ténacité * classe MyException(Exception): passer
Comme vous l'avez vu ci-dessus, le comportement par défaut consiste à réessayer indéfiniment sans attendre lorsqu'une exception est levée.
.. code de test :: @réessayer def never_gonna_give_you_up() : print("Réessayez pour toujours en ignorant les exceptions, n'attendez pas entre les tentatives") déclencher une exception
Soyons un peu moins persévérants et fixons quelques limites, comme le nombre de tentatives avant d'abandonner.
.. code de test :: @retry(stop=stop_after_attempt(7)) def stop_after_7_attempts() : print("Arrêt après 7 tentatives") déclencher une exception
Nous n'avons pas toute la journée, alors fixons une limite pendant laquelle nous devrions réessayer des choses.
.. code de test :: @retry(stop=stop_after_delay(10)) def stop_after_10_s() : print("Arrêt après 10 secondes") déclencher une exception
Si vous avez un délai serré et que dépasser votre délai n'est pas acceptable, vous pouvez alors abandonner les tentatives une fois avant de dépasser le délai.
.. code de test :: @retry(stop=stop_before_delay(10)) def stop_before_10_s() : print("Arrêter 1 tentative avant 10 secondes") déclencher une exception
Vous pouvez combiner plusieurs conditions d'arrêt en utilisant le | opérateur:
.. code de test :: @retry(stop=(stop_after_delay(10) | stop_after_attempt(5))) def stop_after_10_s_or_5_retries() : print("Arrêt après 10 secondes ou 5 tentatives") déclencher une exception
La plupart des éléments n'aiment pas être interrogés aussi rapidement que possible, alors attendons simplement 2 secondes entre les tentatives.
.. code de test :: @retry(wait=wait_fixed(2)) def wait_2_s() : print("Attendez 2 secondes entre les tentatives") déclencher une exception
Certaines choses fonctionnent mieux avec un peu d’aléatoire.
.. code de test :: @retry(wait=wait_random(min=1, max=2)) def wait_random_1_to_2_s() : print("Attendez aléatoirement 1 à 2 secondes entre les tentatives") déclencher une exception
Là encore, il est difficile de battre l’intervalle exponentiel lors de la nouvelle tentative de services distribués et d’autres points de terminaison distants.
.. code de test :: @retry(wait=wait_exponential(multiplier=1, min=4, max=10)) def wait_exponential_1() : print("Attendez 2^x * 1 seconde entre chaque nouvelle tentative en commençant par 4 secondes, puis jusqu'à 10 secondes, puis 10 secondes après") déclencher une exception
Là encore, il est également difficile de battre la combinaison d'attentes fixes et de gigue (pour éviter les troupeaux tonitruants) lors de la nouvelle tentative de services distribués et d'autres points de terminaison distants.
.. code de test :: @retry(wait=wait_fixed(3) + wait_random(0, 2)) def wait_fixed_jitter() : print("Attendez au moins 3 secondes et ajoutez jusqu'à 2 secondes de délai aléatoire") déclencher une exception
Lorsque plusieurs processus sont en conflit pour une ressource partagée, l’augmentation exponentielle de la gigue permet de minimiser les collisions.
.. code de test :: @retry(wait=wait_random_exponential(multiplier=1, max=60)) def wait_exponential_jitter() : print("Attendez aléatoirement jusqu'à 2^x * 1 secondes entre chaque nouvelle tentative jusqu'à ce que la plage atteigne 60 secondes, puis aléatoirement jusqu'à 60 secondes après") déclencher une exception
Il est parfois nécessaire de construire une chaîne de backoffs.
.. code de test :: @retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] + [wait_fixed(7) pour i dans range(2)] + [wait_fixed(9)])) def wait_fixed_chained() : print("Attendez 3 secondes pour 3 tentatives, 7 secondes pour les 2 tentatives suivantes et 9 secondes pour toutes les tentatives suivantes") déclencher une exception
Nous disposons de quelques options pour gérer les nouvelles tentatives qui soulèvent des exceptions spécifiques ou générales, comme dans les cas ici.
.. code de test :: classe ClientError (Exception) : """Un type d'erreur client.""" @retry(retry=retry_if_exception_type(IOError)) def could_io_error() : print("Réessayez indéfiniment sans attendre si une erreur IOError se produit, déclenchez d'autres erreurs") déclencher une exception @retry(retry=retry_if_not_exception_type(ClientError)) def could_client_error() : print("Réessayez indéfiniment sans attendre si une erreur autre que ClientError se produit. Augmentez immédiatement ClientError.") déclencher une exception
Nous pouvons également utiliser le résultat de la fonction pour modifier le comportement des nouvelles tentatives.
.. code de test :: def is_none_p(valeur) : """Renvoie True si la valeur est Aucune""" la valeur de retour est Aucune @retry(retry=retry_if_result(is_none_p)) def could_return_none() : print("Réessayez sans attendre si la valeur de retour est Aucune")
Voir aussi ces méthodes :
.. code de test :: retry_if_exception retry_if_exception_type retry_if_not_exception_type retry_unless_exception_type retry_if_result retry_if_not_result retry_if_exception_message retry_if_not_exception_message réessayer_any réessayer_tout
On peut également combiner plusieurs conditions :
.. code de test :: def is_none_p(valeur) : """Renvoie True si la valeur est Aucune""" la valeur de retour est Aucune @retry(retry=(retry_if_result(is_none_p) | retry_if_exception_type())) def could_return_none() : print("Réessayez définitivement en ignorant les exceptions sans attendre si la valeur de retour est Aucune")
Toute combinaison d'arrêt, d'attente, etc. est également prise en charge pour vous donner la liberté de mélanger et d'assortir.
Il est également possible de réessayer explicitement à tout moment en déclenchant l'exception TryAgain :
.. code de test :: @réessayer def do_something() : résultat = quelque chose_autre() si résultat == 23 : augmenter TryAgain
Normalement, lorsque votre fonction échoue la dernière fois (et ne sera pas réessayée en fonction de vos paramètres), une RetryError est générée. L'exception rencontrée par votre code sera affichée quelque part au milieu de la trace de la pile.
Si vous préférez voir l'exception rencontrée par votre code à la fin de la trace de la pile (là où elle est la plus visible), vous pouvez définir reraise=True.
.. code de test :: @retry(reraise=True, stop=stop_after_attempt(3)) def raise_my_exception() : lever MyException ("Échec") essayer: raise_my_exception() sauf MyException : # nouvelle tentative expirée passer
Il est possible d'exécuter une action avant toute tentative d'appel de la fonction en utilisant la fonction de rappel before :
.. code de test :: journalisation des importations système d'importation logging.basicConfig(stream=sys.stderr, niveau=logging.DEBUG) enregistreur = journalisation.getLogger(__name__) @retry(stop=stop_after_attempt(3), before=before_log(logger, logging.DEBUG)) def raise_my_exception() : lever MyException ("Échec")
Dans le même esprit, il est possible d'exécuter après un appel ayant échoué :
.. code de test :: journalisation des importations système d'importation logging.basicConfig(stream=sys.stderr, niveau=logging.DEBUG) enregistreur = journalisation.getLogger(__name__) @retry(stop=stop_after_attempt(3), after=after_log(logger, logging.DEBUG)) def raise_my_exception() : lever MyException ("Échec")
Il est également possible de consigner uniquement les échecs qui vont être réessayés. Normalement, les nouvelles tentatives se produisent après un intervalle d'attente, donc l'argument mot-clé est appelé before_sleep
:
.. code de test :: journalisation des importations système d'importation logging.basicConfig(stream=sys.stderr, niveau=logging.DEBUG) enregistreur = journalisation.getLogger(__name__) @retry(stop=stop_after_attempt(3), before_sleep=before_sleep_log(logger, logging.DEBUG)) def raise_my_exception() : lever MyException ("Échec")
Vous pouvez accéder aux statistiques sur les nouvelles tentatives effectuées sur une fonction en utilisant l'attribut statistiques attaché à la fonction :
.. code de test :: @retry(stop=stop_after_attempt(3)) def raise_my_exception() : lever MyException ("Échec") essayer: raise_my_exception() sauf exception : passer print(raise_my_exception.statistiques)
.. sortie de test :: :cacher: ...
Vous pouvez également définir vos propres rappels. Le rappel doit accepter un paramètre appelé retry_state
qui contient toutes les informations sur l'appel de nouvelle tentative en cours.
Par exemple, vous pouvez appeler une fonction de rappel personnalisée après l'échec de toutes les tentatives, sans déclencher d'exception (ou vous pouvez relancer ou faire n'importe quoi)
.. code de test :: def return_last_value (retry_state) : """renvoie le résultat de la dernière tentative d'appel""" retourner retry_state.outcome.result() def is_false(valeur) : """Renvoie True si la valeur est False""" la valeur de retour est fausse # renverra False après avoir essayé 3 fois d'obtenir un résultat différent @retry(stop=stop_after_attempt(3), retry_error_callback=return_last_value, retry=retry_if_result(is_false)) def éventuellement_return_false() : retourner Faux
L'argument retry_state
est un objet de la classe :class:`~tenacity.RetryCallState` .
Il est également possible de définir des rappels personnalisés pour d'autres arguments de mots-clés.
.. fonction :: mon_stop(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours :return : si les nouvelles tentatives doivent s'arrêter ou non :rtype: booléen
.. fonction :: my_wait(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours :return : nombre de secondes à attendre avant la prochaine tentative :rtype: flottant
.. fonction :: my_retry(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours :return : si la nouvelle tentative doit continuer ou non :rtype: booléen
.. fonction :: my_before(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours
.. fonction :: my_after(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours
.. fonction :: my_before_sleep(retry_state) :param RetryCallState retry_state : informations sur l'appel de nouvelle tentative en cours
Voici un exemple avec une fonction before_sleep
personnalisée :
.. code de test :: journalisation des importations logging.basicConfig(stream=sys.stderr, niveau=logging.DEBUG) enregistreur = journalisation.getLogger(__name__) def mon_before_sleep(retry_state) : si retry_state.attempt_number < 1 : niveau de journalisation = journalisation.INFO autre: loglevel = journalisation.AVERTISSEMENT enregistreur.log( loglevel, 'Nouvelle tentative de %s : la tentative %s s'est terminée par : %s', retry_state.fn, retry_state.attempt_number, retry_state.outcome) @retry(stop=stop_after_attempt(3), before_sleep=my_before_sleep) def raise_my_exception() : lever MyException ("Échec") essayer: raise_my_exception() sauf RetryError : passer
Vous pouvez modifier les arguments d'un décorateur de nouvelle tentative selon vos besoins lors de son appel en utilisant la fonction retry_with attachée à la fonction encapsulée :
.. code de test :: @retry(stop=stop_after_attempt(3)) def raise_my_exception() : lever MyException ("Échec") essayer: raise_my_exception.retry_with(stop=stop_after_attempt(4))() sauf exception : passer print(raise_my_exception.statistiques)
.. sortie de test :: :cacher: ...
Si vous souhaitez utiliser des variables pour configurer les paramètres de nouvelle tentative, vous n'êtes pas obligé d'utiliser le décorateur de nouvelle tentative - vous pouvez plutôt utiliser directement Retrying :
.. code de test :: def never_good_enough(arg1) : lever une exception ('Argument invalide : {}'.format(arg1)) def try_never_good_enough(max_attempts=3) : retryer = Réessayer (stop=stop_after_attempt(max_attempts), reraise=True) retryer(never_good_enough, 'J'essaie vraiment')
Vous souhaiterez peut-être également modifier temporairement le comportement d'une fonction décorée, comme dans les tests, pour éviter des temps d'attente inutiles. Vous pouvez modifier/corriger l'attribut retry attaché à la fonction. Gardez à l'esprit qu'il s'agit d'un attribut en écriture seule, les statistiques doivent être lues à partir de l'attribut statistiques de la fonction.
.. code de test :: @retry(stop=stop_after_attempt(3), wait=wait_fixed(3)) def raise_my_exception() : lever MyException ("Échec") à partir de la maquette d'importation unittest avec mock.patch.object(raise_my_exception.retry, "wait", wait_fixed(0)) : essayer: raise_my_exception() sauf exception : passer print(raise_my_exception.statistiques)
.. sortie de test :: :cacher: ...
Tenacity vous permet de réessayer un bloc de code sans avoir besoin de l'envelopper dans une fonction isolée. Cela facilite l'isolement du bloc défaillant lors du partage du contexte. L'astuce consiste à combiner une boucle for et un gestionnaire de contexte.
.. code de test :: à partir de l'importation de ténacité Réessayer, RetryError, stop_after_attempt essayer: pour la tentative de nouvelle tentative (stop=stop_after_attempt(3)) : avec tentative : lever une exception ('Mon code échoue !') sauf RetryError : passer
Vous pouvez configurer tous les détails de la politique de nouvelle tentative en configurant l'objet Retrying.
Avec le code asynchrone, vous pouvez utiliser AsyncRetrying.
.. code de test :: à partir de l'importation de ténacité AsyncRetrying, RetryError, stop_after_attempt fonction def asynchrone() : essayer: async pour tentative dans AsyncRetrying(stop=stop_after_attempt(3)) : avec tentative : lever une exception ('Mon code échoue !') sauf RetryError : passer
Dans les deux cas, vous souhaiterez peut-être définir le résultat de la tentative afin qu'il soit disponible dans les stratégies de nouvelle tentative telles que retry_if_result
. Cela peut être fait en accédant à la propriété retry_state
:
.. code de test :: à partir de l'importation de ténacité AsyncRetrying, retry_if_result fonction def asynchrone() : async pour tentative dans AsyncRetrying(retry=retry_if_result(lambda x: x < 3)) : avec tentative : résultat = 1 # Quelques calculs complexes, appels de fonction, etc. sinon, try.retry_state.outcome.failed : tentative.retry_state.set_result(résultat) résultat de retour
Enfin, retry
fonctionne également sur les coroutines asyncio, Trio et Tornado (>= 4.5). Les sommeils se font également de manière asynchrone.
@ retry
async def my_asyncio_function ( loop ):
await loop . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
async def my_async_trio_function ():
await trio . socket . getaddrinfo ( '8.8.8.8' , 53 )
@ retry
@ tornado . gen . coroutine
def my_async_tornado_function ( http_client , url ):
yield http_client . fetch ( url )
Vous pouvez même utiliser des boucles d'événements alternatives telles que Curio en passant la bonne fonction sleep :
@ retry ( sleep = curio . sleep )
async def my_async_curio_function ():
await asks . get ( 'https://example.org' )
reno est utilisé pour gérer les journaux de modifications. Jetez un œil à leurs documents d’utilisation.
La génération de doc compilera automatiquement les changelogs. Il vous suffit de les ajouter.
# Opens a template file in an editor
tox -e reno -- new some-slug-for-my-change --edit