异常后如何重试?

我有一个以for i in range(0, 100)开始的循环。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前我已经设置它,以便在失败时,它将在except子句中continue(继续到i的下一个数字)。

我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?

464558 次浏览

在for循环中执行while True,将try代码放入其中,只有当代码成功时才退出while循环。

for i in range(0,100):
while True:
try:
# do stuff
except SomeSpecificException:
continue
break

最清晰的方法是显式设置i。例如:

i = 0
while i < 100:
i += 1
try:
# do stuff


except MyException:
continue

只有当try子句成功时才增加循环变量

不使用那些丑陋的while循环的更“功能性”的方法:

def tryAgain(retries=0):
if retries > 10: return
try:
# Do stuff
except:
retries+=1
tryAgain(retries)


tryAgain()

Python装饰器库中有类似的东西。

请记住,它不测试异常,而是测试返回值。它会重新尝试,直到被修饰的函数返回True。

稍微修改一下版本就可以了。

我倾向于限制重试次数,这样如果某个特定项目出现问题,你就可以继续进行下一个项目,如下:

for i in range(100):
for attempt in range(10):
try:
# do thing
except:
# perhaps reconnect, etc.
else:
break
else:
# we failed all the attempts - deal with the consequences.

更新2021-12-01:

自2016年6月起,不再维护重试包。 考虑使用活动fork github.com/jd/tenacity,或者github.com/litl/backoff.


失败的计划是在失败时重试代码块的好方法。

例如:

@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
print("Randomly wait 1 to 2 seconds between retries")

带超时的通用解决方案:

import time


def onerror_retry(exception, callback, timeout=2, timedelta=.1):
end_time = time.time() + timeout
while True:
try:
yield callback()
break
except exception:
if time.time() > end_time:
raise
elif timedelta > 0:
time.sleep(timedelta)

用法:

for retry in onerror_retry(SomeSpecificException, do_stuff):
retry()

以下是我关于如何解决这个问题的想法:

j = 19
def calc(y):
global j
try:
j = j + 8 - y
x = int(y/j)   # this will eventually raise DIV/0 when j=0
print("i = ", str(y), " j = ", str(j), " x = ", str(x))
except:
j = j + 1   # when the exception happens, increment "j" and retry
calc(y)
for i in range(50):
calc(i)

这里有一个与其他解决方案类似的解决方案,但是如果在规定的次数或重试次数内没有成功,它将引发异常。

tries = 3
for i in range(tries):
try:
do_the_thing()
except KeyError as e:
if i < tries - 1: # i is zero indexed
continue
else:
raise
break

使用while和计数器:

count = 1
while count <= 3:  # try 3 times
try:
# do_the_logic()
break
except SomeSpecificException as e:
# If trying 3rd time and still error??
# Just throw the error- we don't have anything to hide :)
if count == 3:
raise
count += 1

使用递归

for i in range(100):
def do():
try:
## Network related scripts
except SpecificException as ex:
do()
do() ## invoke do() whenever required inside this loop

你可以使用Python重试包。 重试 < / p >

它是用Python编写的,目的是简化向几乎任何东西添加重试行为的任务。

如果你想要一个没有嵌套循环的解决方案,并在成功时调用break,你可以为任何可迭代对象开发一个快速包装retriable。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:

client = get_client()
smart_loop = retriable(list_of_values):


for value in smart_loop:
try:
client.do_something_with(value)
except ClientAuthExpired:
client = get_client()
smart_loop.retry()
continue
except NetworkTimeout:
smart_loop.retry()
continue

我在我的代码中使用following,

   for i in range(0, 10):
try:
#things I need to do
except ValueError:
print("Try #{} failed with ValueError: Sleeping for 2 secs before next try:".format(i))
time.sleep(2)
continue
break
for _ in range(5):
try:
# replace this with something that may fail
raise ValueError("foo")


# replace Exception with a more specific exception
except Exception as e:
err = e
continue


# no exception, continue remainder of code
else:
break


# did not break the for loop, therefore all attempts
# raised an exception
else:
raise err

我的版本与上面的几个类似,但没有使用单独的while循环,如果所有重试都失败,则重新引发最新的异常。可以显式地在顶部设置err = None,但不是严格必要的,因为它只应该在发生错误时执行最后的else块,因此设置了err

我最近用我的python解决了这个问题,我很高兴与stackoverflow的访问者分享,如果需要请给予反馈。

print("\nmonthly salary per day and year converter".title())
print('==' * 25)




def income_counter(day, salary, month):
global result2, result, is_ready, result3
result = salary / month
result2 = result * day
result3 = salary * 12
is_ready = True
return result, result2, result3, is_ready




i = 0
for i in range(5):
try:
month = int(input("\ntotal days of the current month: "))
salary = int(input("total salary per month: "))
day = int(input("Total Days to calculate> "))
income_counter(day=day, salary=salary, month=month)
if is_ready:
print(f'Your Salary per one day is: {round(result)}')
print(f'your income in {day} days will be: {round(result2)}')
print(f'your total income in one year will be: {round(result3)}')
break
else:
continue
except ZeroDivisionError:
is_ready = False
i += 1
print("a month does'nt have 0 days, please try again")
print(f'total chances left: {5 - i}')
except ValueError:
is_ready = False
i += 1
print("Invalid value, please type a number")
print(f'total chances left: {5 - i}')

attempts = 3
while attempts:
try:
...
...
<status ok>
break
except:
attempts -=1
else: # executed only break was not  raised
<status failed>

以下是我对这个问题的看法。下面的retry函数支持以下特性:

  • 当调用成功时返回被调用函数的值
  • 如果尝试失败,则引发被调用函数的异常
  • 尝试次数限制(0表示无限)
  • 在尝试之间等待(线性或指数)
  • 仅当异常是特定异常类型的实例时重试。
  • 可选的尝试记录
import time


def retry(func, ex_type=Exception, limit=0, wait_ms=100, wait_increase_ratio=2, logger=None):
attempt = 1
while True:
try:
return func()
except Exception as ex:
if not isinstance(ex, ex_type):
raise ex
if 0 < limit <= attempt:
if logger:
logger.warning("no more attempts")
raise ex


if logger:
logger.error("failed execution attempt #%d", attempt, exc_info=ex)


attempt += 1
if logger:
logger.info("waiting %d ms before attempt #%d", wait_ms, attempt)
time.sleep(wait_ms / 1000)
wait_ms *= wait_increase_ratio

用法:

def fail_randomly():
y = random.randint(0, 10)
if y < 10:
y = 0
return x / y




logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))


logger.info("starting")
result = retry.retry(fail_randomly, ex_type=ZeroDivisionError, limit=20, logger=logger)
logger.info("result is: %s", result)

更多信息见我的帖子

retrying的替代方案:tenacitybackoff(2020年更新)

重试库以前是走的路,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他选项似乎是倒扣韧性。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:

from functools import partial
import random # producing random errors for this example


from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type


# Custom error type for this example
class CommunicationError(Exception):
pass


# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
retry,
stop=stop_after_delay(10),  # max. 10 seconds wait.
wait=wait_fixed(0.4),  # wait 400ms
retry=retry_if_exception_type(CommunicationError),
)()




@retry_on_communication_error
def do_something_unreliable(i):
if random.randint(1, 5) == 3:
print('Run#', i, 'Error occured. Retrying.')
raise CommunicationError()


for i in range(100):
do_something_unreliable(i)

上面的代码输出如下:

Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.

tenacity.retry的更多设置列在坚韧的GitHub页面中。

Decorator是一个很好的方法。

from functools import wraps
import time


class retry:
def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
self.success = success
self.times = times
self.raiseexception = raiseexception
self.echo = echo
self.delay = delay
def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
ex = Exception(f"{fun} failed.")
r = None
for i in range(times):
if i > 0:
time.sleep(delay*2**(i-1))
try:
r = fun(*args, **kwargs)
s = success(r)
except Exception as e:
s = False
ex = e
# raise e
if not s:
continue
return r
else:
if echo:
print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
if raiseexception:
raise ex
def __call__(self, fun):
@wraps(fun)
def wraper(*args, retry=0, **kwargs):
retry = retry if retry>0 else self.times
return self.__class__.retry(fun, *args,
success=self.success,
times=retry,
delay=self.delay,
raiseexception = self.raiseexception,
echo = self.echo,
**kwargs)
return wraper

一些用法示例:

@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
x.append(1)
print(x)
return len(x)
> rf1()


[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]


4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
l.append(v)
print(l)
assert len(l)>4
return len(l)
> rf2(v=2, retry=10) #overwite times=4


[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]


5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)


3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)


TypeError: unsupported operand type(s) for +: 'int' and 'str'

我喜欢使用bool值,如下所示:

success = False
num_try = 0
while success is False:
if num_try >= 10: # or any number
# handle error how  you please
try:
# code
success = True
except Exception as e:
# record or do something with exception if needed
num_try += 1

使用这个装饰器,您可以轻松地控制错误

class catch:
def __init__(self, max=1, callback=None):
self.max = max
self.callback = callback
    

def set_max(self, max):
self.max = max
    

def handler(self, *args, **kwargs):
self.index = 0
while self.index < self.max:
self.index += 1
try:
self.func(self, *args, **kwargs)
        

except Exception as error:
if callable(self.callback):
self.callback(self, error, args, kwargs)
                

def __call__(self, func):
self.func = func
return self.handler


import time
def callback(cls, error, args, kwargs):
print('func args', args, 'func kwargs', kwargs)
print('error', repr(error), 'trying', cls.index)
if cls.index == 2:
cls.set_max(4)
    

else:
time.sleep(1)
    

    

@catch(max=2, callback=callback)
def test(cls, ok, **kwargs):
raise ValueError('ok')


test(1, message='hello')

我使用这个,它可以用于任何函数:

def run_with_retry(func: callable, max_retries: int = 3, wait_seconds: int = 2, **func_params):
num_retries = 1
while True:
try:
return func(*func_params.values())
except Exception as e:
if num_retries > max_retries:
print('we have reached maximum errors and raising the exception')
raise e
else:
print(f'{num_retries}/{max_retries}')
print("Retrying error:", e)
num_retries += 1
sleep(wait_seconds)

像这样调用:

    def add(val1, val2):
return val1 + val2


run_with_retry(func=add, param1=10, param2=20)

如果你想要的是重试x次失败的尝试,那么一个for else循环可能就是你想要的。考虑这个例子,尝试了3次:

attempts = 3


for attempt in range(1, attempts+1):
try:
if attempt < 4:
raise TypeError(f"Error raised on attempt: {attempt}")
else:
print(f'Attempt {attempt} finally worked.')
except (TypeError) as error:
print(f'Attempt {attempt} hit the exception.')
continue
else:
break
else:
print(f'Exit after final attempt: {attempt}')


print(f'\nGo on to execute other code ...')

给出输出:

Attempt 1 hit the exception.
Attempt 2 hit the exception.
Attempt 3 hit the exception.
Exit after final attempt: 3


Go on to execute other code ...

再试一次它就成功了

attempts = 4

给出输出:

Attempt 1 hit the exception.
Attempt 2 hit the exception.
Attempt 3 hit the exception.
Attempt 4 finally worked.


Go on to execute other code ...

你可以有一个专门的函数使用return来短路结果。比如这样:

def my_function_with_retries(..., max_retries=100):
for attempt in range(max_retries):
try:
return my_function(...)
except SomeSpecificException as error:
logging.warning(f"Retrying after failed execution: {error}")


raise SomeOtherException()

这里有一个快速装饰器来处理这个问题。7行,没有依赖关系。

def retry(exception=Exception, retries=3, delay=0):
def wrap(func):
for i in range(retries):
try:
return func()
except exception as e:
print(f'Retrying {func.__name__}: {i}/{retries}')
time.sleep(delay)
raise e
return wrap


@retry()
def do_something():
...
@retry(HTTPError, retries=100, delay=3)
def download_something():
...

可以添加的一个功能是扩展异常以处理多个异常(splat一个列表)。

我喜欢laurent-laporte的 回答。下面是我的版本,它包装在一个类与静态方法和一些例子。我实现了重试计数作为另一种重试方式。还增加了kwargs。

from typing import List
import time




class Retry:
@staticmethod
def onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
errors: List = None, **kwargs):
"""


@param exception: The exception to trigger retry handling with.
@param callback: The function that will potentially fail with an exception
@param retries: Optional total number of retries, regardless of timing if this threshold is met, the call will
raise the exception.
@param timeout: Optional total amount of time to do retries after which the call will raise an exception
@param timedelta: Optional amount of time to sleep in between calls
@param errors: A list to receive all the exceptions that were caught.
@param kwargs: An optional key value parameters to pass to the function to retry.
"""
for retry in Retry.__onerror_retry(exception, callback, retries, timeout, timedelta, errors, **kwargs):
if retry: retry(**kwargs)  # retry will be None when all retries fail.


@staticmethod
def __onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
errors: List = None, **kwargs):
end_time = time.time() + timeout
continues = 0
while True:
try:
yield callback(**kwargs)
break
except exception as ex:
print(ex)
if errors:
errors.append(ex)


continues += 1
if 0 < retries < continues:
print('ran out of retries')
raise


if timeout > 0 and time.time() > end_time:
print('ran out of time')
raise
elif timedelta > 0:
time.sleep(timedelta)




err = 0


#
# sample dumb fail function
def fail_many_times(**kwargs):
global err
err += 1
max_errors = kwargs.pop('max_errors', '') or 1
if err < max_errors:
raise ValueError("I made boo boo.")
print("Successfully did something.")


#
# Example calls
try:
#
# retries with a parameter that overrides retries... just because
Retry.onerror_retry(ValueError, fail_many_times, retries=5, max_errors=3)
err = 0
#
# retries that run out of time, with 1 second sleep between retries.
Retry.onerror_retry(ValueError, fail_many_times, timeout=5, timedelta=1, max_errors=30)
except Exception as err:
print(err)