发布时间:2025-12-10 23:10:44 浏览次数:1
backoff 模块简介及安装
这个模块主要提供了是一个装饰器,用于装饰函数,使得它在遇到某些条件时会重试(即反复执行被装饰的函数)。通常适用于我们在获取一些不可靠资源,比如会间歇性故障的资源等。
此外,装饰器支持正常的同步方法,也支持异步asyncio代码。
backoff 模块的安装也很简单,通过 pip 即可安装完成:
pipinstallbackoff
backoff 用法及简单源码分析
backoff 提供两个主要的装饰器,通过 backoff. 调用,通过提示我们可以看到这两个装饰器,分别是:
backoff.on_predicatebackoff.on_exception
通过 github 查看 backoff 的源码,源码目录 backoff/_decorator.py,定义如下:
defon_predicate(wait_gen,predicate=operator.not_,max_tries=None,max_time=None,jitter=full_jitter,on_success=None,on_backoff=None,on_giveup=None,logger='backoff',**wait_gen_kwargs):#省略具体代码#每个参数的定义在源码中都给出了明确的解释passdefon_exception(wait_gen,exception,max_tries=None,max_time=None,jitter=full_jitter,giveup=lambdae:False,on_success=None,on_backoff=None,on_giveup=None,logger='backoff',**wait_gen_kwargs):#省略具体代码#每个参数的定义在源码中都给出了明确的解释pass
可以看到,定义了很多的参数,这些参数在源码中都给出了比较详细的解释,这里做简单的介绍:
首先,wait_gen:表示每次循环等待的时长,以秒为单位。它的类型是一个生成器,在 backoff 中内置了三个生成器。我们查看下源码,目录为 backoff/_wait_gen.py。我们取其中一个的详细实现来看下:
#省略实现代码#base*factor*ndefexpo(base=2,factor=1,max_value=None):"""Generatorforexponentialdecay.Args:base:Themathematicalbaseoftheexponentiationoperationfactor:Factortomultiplytheexponentationby.max_value:Themaximumvaluetoyield.Oncethevalueinthetrueexponentialsequenceexceedsthis,thevalueofmax_valuewillforeverafterbeyielded."""n=0whileTrue:a=factor*base**nifmax_valueisNoneora<max_value:yieldan+=1else:yieldmax_value#通过斐波那契数列控制deffibo(max_value=None):pass#常量数值defconstant(interval=1):pass
从源码不难看出,通过一些策略,每次 yield 返回不同的数值,这些数值就是重试等待秒数。当然因为这个参数类型是生成器,显然我们也是可以自定义的。同时我们会发现每个 wait_gen 都是参数控制的,所以我们理应是可以修改这个参数的初始值的。
显然,wait_gen_kwargs就是用来传递这些参数的,它是通过可变关键字参数控制的,可以直接用 key=value 的形式进行传参,简单示例如下:
@backoff.on_predicate(backoff.constant,interval=5)defmain3():print("timeis{}retry...".format(time.time()))predict 与 exception。这两个相对比较简单,predict 接受一个函数,当这个函数返回 True 时会进行重试,否则停止,同时这个函数接受一个参数,这个参数的值是被装饰函数的返回值。这个参数的默认值是:operator._not。这个函数的源码如下:
defnot_(a):"Sameasnota."returnnota
所以默认返回的是 not 被装饰函数的返回值。如果当被装饰函数并没有返回值时,返回 True,会进行重试。
示例代码如下:
importbackoffimporttime@backoff.on_predicate(backoff.fibo)deftest2():print("timeis{},retry...".format(time.time()))if__name__=="__main__":test2()#等价于:#必须接受一个参数,这个参数的值是被装饰函数的返回值defcondition(r):returnTrue@backoff.on_predicate(backoff.fibo,condition)deftest2():print("timeis{},retry...".format(time.time()))if__name__=="__main__":test2()执行结果如下:
$python3backoff_test.pytimeis1571801845.834578,retry...timeis1571801846.121314,retry...timeis1571801846.229812,retry...timeis1571801846.533237,retry...timeis1571801849.460303,retry...timeis1571801850.8974788,retry...timeis1571801856.498335,retry...timeis1571801861.56931,retry...timeis1571801872.701226,retry...timeis1571801879.198495,retry......
需要注意几点:
如果自定义这个参数对应的函数,这个函数是需要接受一个参数的,这个参数的值是被装饰函数的返回值。我们可以通过控制这个返回值来做一些条件判断,当达到某些特殊条件时重试结束。
示例中 wait_gen 用的是 backoff.fibo,注意观察输出的时间单隔,这里的时间间隔好像并不像我们想象中按 fibo 返回的时间间隔数,实际上如果想达到这个效果,我们需要将 jitter 参数设置为 None,后面介绍 jitter 参数时再做说明。
而 exception 则是接受异常类型的实例,可以是单个异常,也可以是元组形式的多个异常。简单示例如下:
importtimeimportrandomimportbackofffromcollectionsimportdequeclassMyException(Exception):def__init__(self,message,status):super().__init__(message,status)self.message=messageself.status=statusclassMyException2(Exception):pass@backoff.on_exception(backoff.expo,(MyException,MyException2))defmain():random_num=random.randint(0,9)print("retry...andrandomnumis{}".format(random_num))ifrandom_num%2==0:raiseMyException("myexception",int("1000"+str(random_num)))raiseMyException2()max_tries 与 max_time 也比较简单,分别代表最大重试次数与最长重试时间。这里就不做演示了。
@backoff.on_exception 中的 giveup,它接受一个异常实例,通过对这个实例做一些条件判断,达到判断是否需要继续循环的目的。如果返回 True,则结束,反之继续。默认值一直是返回 False,即会一直循环。示例如下:
importrandomimportbackoffclassMyException(Exception):def__init__(self,message,status):super().__init__(message,status)self.message=messageself.status=statusdefexception_status(e):print('exceptionstatuscodeis{}'.format(e.status))returne.status%2==0@backoff.on_exception(backoff.expo,MyException,giveup=exception_status)defmain():random_num=random.randint(0,9)print("retry...andrandomnumis{}".format(random_num))raiseMyException("myexception",int("1000"+str(random_num)))if__name__=="__main__":main()运行结果:
retry...andrandomnumis5exceptionstatuscodeis10005retry...andrandomnumis0exceptionstatuscodeis10000#会再走一遍raise的代码,所以异常仍然会抛出来Traceback(mostrecentcalllast):File"backoff_test.py",line36,in<module>main()File"/Users/ruoru/code/python/exercise/.venv/lib/python3.7/site-packages/backoff/_sync.py",line94,inretryret=target(*args,**kwargs)File"backoff_test.py",line32,inmainraiseMyException("myexception",int("1000"+str(random_num)))__main__.MyException:('myexception',10000)需要注意两点:
这个参数接受的函数仍然只有一个参数,这个参数的值是一个异常实例对象
从结果我们可以看出,当抛出异常时,会先进入 giveup 接受的函数,如果函数判断需要 giveup 时,当前的异常仍然会抛出。所以有需要,代码仍然需要做异常逻辑处理。
on_success、on_backoff 与 on_giveup 这三个是一类的参数,用于做事件处理:
on_sucess 事件会比较难理解一点,它表示的是被装饰函数成功结束轮循则会退出,对于 on_exception 来说即当被装饰函数没有发生异常时则会调用 on_success。而对于 on_predicate 来说即是通过 predicate 关键字返回为 False 结束循环则会调用。
on_backoff 即当程序产生循环时会调用
on_giveup 当程序是达到当前可尝试最大次数后,会调用。对于 on_predicate 如果是通过 max_tries 或者 max_time 会调用,而对于 on_exception ,对于 exception 参数返回 True 时也会调用 on_giveup
总结来说,max_tries 和 max_time 这种直接控制结束的,调用的是 on_giveup,而 exception 参数也是通过返回 True 则程序就结束,它是用来控制程序结束的,所以也会调用 on_giveup。而 predicate 参数返回 True 则程序继续,它是用来控制程序是否继续徨的,所以当它结束时,调用的是 on_success。
实验代码如下:
'''@Author:ruoru@Date:2019-10-2215:30:32@LastEditors:ruoru@LastEditTime:2019-10-2314:37:13@Description:backoff'''importtimeimportrandomimportbackoffclassMyException(Exception):def__init__(self,status,message):super().__init__(status,message)self.status=statusself.message=messagedefbackoff_hdlr(details):print("Backingoff{wait:0.1f}secondsafters{tries}tries""callingfunction{target}withargs{args}andkwargs""{kwargs}".format(**details))defsuccess_hdlr(details):print("Successoffafters{tries}tries""callingfunction{target}withargs{args}andkwargs""{kwargs}".format(**details))defgiveup_hdlr(details):print("Giveupoff{tries}tries""callingfunction{target}withargs{args}andkwargs""{kwargs}".format(**details))@backoff.on_predicate(backoff.constant,#当randomnum不等10009则继续#当random_num等于10009后,会调用on_successlambdax:x!=10009,on_success=success_hdlr,on_backoff=backoff_hdlr,on_giveup=giveup_hdlr,max_time=2)defmain():num=random.randint(10000,10010)print("timeis{},numis{},retry...".format(time.time(),num))returnnum@backoff.on_exception(backoff.constant,MyException,#当Exception实例对象的status为10009成立时退出#当条件成立时,调用的是on_giveupgiveup=lambdae:e.status==10009,on_success=success_hdlr,on_backoff=backoff_hdlr,on_giveup=giveup_hdlr,)defmain2():num=random.randint(10000,10010)print("timeis{},numis{},retry...".format(time.time(),num))#如果是通过这个条件成立退出,调用的是on_successifnum==10010:returnraiseMyException(num,"hhh")if__name__=="__main__":#main()main2()logger 参数,很显然就是用来控制日志输出的,这里不做详细介绍。copy 官方文档的一个示例:
my_logger=logging.getLogger('my_logger')my_handler=logging.StreamHandler()my_logger.add_handler(my_handler)my_logger.setLevel(logging.ERROR)@backoff.on_exception(backoff.expo,requests.exception.RequestException,logger=my_logger)#...最后一个参数,jitter,开始也不是很明白这个参数的作用,文档的解释如下:
jitter: A function of the value yielded by wait_gen returning the actual time to wait. This distributes wait times stochastically in order to avoid timing collisions across concurrent clients. Wait times are jittered by default using the full_jitter function. Jittering may be disabled altogether by passing jitter=None.
有点晕,于是去看了下源码,明白了用法,截取关键源码如下:
#backoff/_decorator.pydefon_predicate(wait_gen,predicate=operator.not_,max_tries=None,max_time=None,jitter=full_jitter,on_success=None,on_backoff=None,on_giveup=None,logger='backoff',**wait_gen_kwargs):pass#省略#因为没有用到异步,所以会进到这里ifretryisNone:retry=_sync.retry_predicate#backoff/_sync#分析可以看到有一句获取下次wait时长seconds=_next_wait(wait,jitter,elapsed,max_time_)#backoff/_commondef_next_wait(wait,jitter,elapsed,max_time):value=next(wait)try:ifjitterisnotNone:seconds=jitter(value)else:seconds=valueexceptTypeError:warnings.warn("Nullaryjitterfunctionsignatureisdeprecated.Use""unarysignatureacceptingawaitvalueinsecondsand""returningajitteredversionofit.",DeprecationWarning,stacklevel=2,)seconds=value+jitter()#don'tsleeplongerthanremainingallotedmax_timeifmax_timeisnotNone:seconds=min(seconds,max_time-elapsed)returnseconds看前面几行代码应该就会比较清晰了,如果 jitter 为 None,则会使用第一个参数返回的 value 值,而如果使用了,则会在这个 value 值上再做一次算法,默认为 full_jitter(value)。backoff/_jitter.py 提供了两个算法,代码不长,贴上来看看:
importrandomdefrandom_jitter(value):"""Jitterthevaluearandomnumberofmilliseconds.Thisaddsupto1secondofadditionaltimetotheoriginalvalue.Priortobackoffversion1.2thiswasthedefaultjitterbehavior.Args:value:Theunadulteratedbackoffvalue."""returnvalue+random.random()deffull_jitter(value):"""Jitterthevalueacrossthefullrange(0tovalue).Thiscorrespondstothe"FullJitter"algorithmspecifiedintheAWSblog'spostontheperformanceofvariousjitteralgorithms.(http://www.awsarchitectureblog.com/2015/03/backoff.html)Args:value:Theunadulteratedbackoffvalue."""returnrandom.uniform(0,value)