如何重复执行一个函数每x秒?

我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

关于用Python实现的cron的问题中,解决方案似乎有效地只睡眠() x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
# Code executed here
time.sleep(60)

这段代码是否存在任何可预见的问题?

793762 次浏览

如果你的程序还没有事件循环,可以使用固定播送时间模块,它实现了一个通用的事件调度器。

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print("Doing stuff...")
# do your stuff
sc.enter(60, 1, do_something, (sc,))


s.enter(60, 1, do_something, (s,))
s.run()

如果你已经在使用事件循环库,如asynciotriotkinterPyQt5gobjectkivy等,只需使用现有事件循环库的方法来调度任务。

它和cron之间的主要区别是异常会永久地杀死守护进程。您可能希望使用异常捕获器和记录器进行包装。

你可能想要考虑扭曲的,它是一个Python网络库,实现了反应器模式

from twisted.internet import task, reactor


timeout = 60.0 # Sixty seconds


def doWork():
#do work here
pass


l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds


reactor.run()

虽然“While True: sleep(60)”可能会工作,Twisted可能已经实现了许多你最终需要的功能(如bobince指出的守护进程化、日志记录或异常处理),并且可能是一个更健壮的解决方案

我认为更简单的方法是:

import time


def executeSomething():
#code here
time.sleep(60)


while True:
executeSomething()
这样你的代码被执行,然后等待60秒,然后它再次执行,等待,执行,等等… 没有必要把事情复杂化:D

我以前也遇到过类似的问题。可能http://cronus.readthedocs.org可能有帮助?

对于v0.2,下面的代码片段可以工作

import cronus.beat as beat


beat.set_rate(2) # 2 Hz
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec

像这样将你的时间循环锁定到系统时钟上:

import time
starttime = time.time()
while True:
print("tick")
time.sleep(60.0 - ((time.time() - starttime) % 60.0))

如果你想要一种非阻塞的方式来周期性地执行你的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次您的函数。我经常使用这种技术打印长时间、CPU/磁盘/网络密集型任务的进度信息。

下面是我在类似问题中发布的代码,带有start()和stop()控件:

from threading import Timer


class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer     = None
self.interval   = interval
self.function   = function
self.args       = args
self.kwargs     = kwargs
self.is_running = False
self.start()


def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)


def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True


def stop(self):
self._timer.cancel()
self.is_running = False

用法:

from time import sleep


def hello(name):
print "Hello %s!" % name


print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!

特点:

  • 只有标准库,没有外部依赖
  • start()stop()可以安全地多次调用,即使计时器已经启动/停止
  • 要调用的函数可以有位置参数和命名参数
  • 你可以随时更改interval,它将在下次运行后生效。对于argskwargs甚至function也是一样!

下面是MestreLion对代码的更新,它可以避免随着时间的推移而漂移。

这里的RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;调度并不取决于函数执行的时间。我喜欢这个解决方案,因为它没有外部库依赖关系;这是纯python。

import threading
import time


class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()


def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)


def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True


def stop(self):
self._timer.cancel()
self.is_running = False

示例用法(摘自MestreLion的回答):

from time import sleep


def hello(name):
print "Hello %s!" % name


print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!

我用这个方法使每小时产生60个事件,其中大多数事件在整分钟后的相同秒数内发生:

import math
import time
import random


TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging


def set_timing():


now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)


info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK


while 1:


set_timing()


print('hello world')


#random real world event
time.sleep(random.random()*TICK_MINIMUM)


info['tick'] += 1
info['completion_time'] = time.time()

根据实际情况,你可能会得到长度的刻度:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但在60分钟结束时,你会有60个滴答;而且它们中的大多数都将出现在您喜欢的正确偏移时间。

在我的系统上,我得到了典型的漂移<1/20秒,直到需要修正。

该方法的优点是具有较好的时钟漂移分辨率;这可能会导致问题,如果你做的事情,比如每tick追加一个项目,你希望每小时追加60个项目。未能考虑漂移可能导致次要指标,如移动平均线,将数据考虑得过于深入过去,从而导致错误的输出。

一个可能的答案是:

import time
t=time.time()


while True:
if time.time()-t>10:
#run your task here
t=time.time()

例如:显示当前本地时间

import datetime
import glib
import logger


def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)


def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True


# call every minute
glib.timeout_add(60*1000, display_local_time)

我使用Tkinter after()方法,它不会“窃取游戏”(就像前面介绍的固定播送时间模块),即它允许其他东西并行运行:

import Tkinter


def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)


def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)


tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()do_something2()可以并行运行,并且可以以任意的间隔速度运行。在这里,第2个将以两倍的速度执行。还要注意,我使用了一个简单的计数器作为终止任一函数的条件。你可以使用任何你喜欢的条件,或者不使用,如果你想让一个函数运行到程序终止(例如一个时钟)。

import time, traceback


def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay


def foo():
print("foo", time.time())


every(5, foo)

如果你想在不阻塞剩余代码的情况下这样做,你可以使用这个让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少结合的几个特性:

  • 异常处理:在这个级别上,异常会被正确处理,也就是说,在不中止程序的情况下记录调试目的。
  • 你在许多答案中发现的常见链式实现(用于调度下一个事件)是脆弱的,因为如果调度机制(threading.Timer或其他)中出现任何错误,这将终止链。即使问题的原因已经解决,也不会继续执行。相比之下,使用简单的sleep()进行简单的循环和等待要健壮得多。
  • 没有漂移:我的解决方案保持一个精确的时间轨道,它应该运行在。不存在依赖于执行时间的漂移(在许多其他解决方案中)。
  • 跳过:我的解决方案将跳过任务,如果一个执行花费了太多的时间(例如每五秒做X,但X花了6秒)。这是标准的cron行为(有一个很好的理由)。许多其他解决方案只是连续执行任务几次,没有任何延迟。对于大多数情况(例如清理任务),这是不希望的。如果它愿意,只需使用next_time += delay即可。
    ''' tracking number of times it prints'''
import threading


global timeInterval
count=0
def printit():
threading.Timer(timeInterval, printit).start()
print( "Hello, World!")
global count
count=count+1
print(count)
printit


if __name__ == "__main__":
timeInterval= int(input('Enter Time in Seconds:'))
printit()
这是MestreLion代码的改编版本。 除了原来的函数,这段代码:

1)添加用于在特定时间触发计时器的first_interval(调用者需要计算first_interval并传递进来)

2)在原代码中解决一个竞态条件。在原始代码中,如果控制线程未能取消正在运行的计时器(“停止计时器,并取消计时器动作的执行。这只在计时器仍处于等待阶段时才会工作。”引用自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
self.timer      = None
self.first_interval = first_interval
self.interval   = interval
self.func   = func
self.args       = args
self.kwargs     = kwargs
self.running = False
self.is_started = False


def first_start(self):
try:
# no race-condition here because only control thread will call this method
# if already started will not start again
if not self.is_started:
self.is_started = True
self.timer = Timer(self.first_interval, self.run)
self.running = True
self.timer.start()
except Exception as e:
log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
raise


def run(self):
# if not stopped start again
if self.running:
self.timer = Timer(self.interval, self.run)
self.timer.start()
self.func(*self.args, **self.kwargs)


def stop(self):
# cancel current timer in case failed it's still OK
# if already stopped doesn't matter to stop again
if self.timer:
self.timer.cancel()
self.running = False

我最终使用了时间表模块。API很好。

import schedule
import time


def job():
print("I'm working...")


schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)


while True:
schedule.run_pending()
time.sleep(1)

这里是另一个不使用任何额外库的解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
"""Delay using a boolean callable function.


`condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
It can break early if condition is met.


Args:
condition_fn     - a callable boolean function
interval_in_sec  - wait time between calling `condition_fn`
timeout_in_sec   - maximum time to run


Returns: None
"""
start = last_call = time.time()
while time.time() - start < timeout_in_sec:
if (time.time() - last_call) > interval_in_sec:
if condition_fn() is True:
break
last_call = time.time()

如果漂移不是一个问题

import threading, time


def print_every_n_seconds(n=2):
while True:
print(time.ctime())
time.sleep(n)
    

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()


异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果正在运行的任务需要相当多的时间,那么间隔就变成2秒+任务时间,所以如果您需要精确的调度,那么这并不适合您。

注意daemon=True标志意味着这个线程不会阻止应用程序关闭。例如,在运行测试等待此头停止后,pytest将无限期挂起的问题。

可选的灵活性解决方案是Apscheduler

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass


sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds


sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

  • BlockingScheduler:当调度程序是进程中唯一运行的程序时使用

  • BackgroundScheduler:当你不使用下面的任何框架,并且希望调度器在应用程序的后台运行时使用

  • AsyncIOScheduler:如果您的应用程序使用asyncio模块,则使用

  • GeventScheduler:如果您的应用程序使用gevent则使用

  • TornadoScheduler:如果您正在构建一个Tornado应用程序,则使用它

  • TwistedScheduler:如果您正在构建一个Twisted应用程序,则使用

  • QtScheduler:如果您正在构建Qt应用程序,则使用

简单地使用

import time


while True:
print("this will run after every 30 sec")
#Your code here
time.sleep(30)

我认为这取决于你想做什么,你的问题没有详细说明。

对我来说,我想在一个已经多线程的进程中做一个昂贵的操作。所以我让leader流程检查时间,只有她做昂贵的操作(检查点深度学习模型)。为了做到这一点,我增加了计数器,以确保5秒、10秒、15秒过去,每5秒保存一次(或使用math.floor的模块化算术):

def print_every_5_seconds_have_passed_exit_eventually():
"""
https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
:return:
"""
opts = argparse.Namespace(start=time.time())
next_time_to_print = 0
while True:
current_time_passed = time.time() - opts.start
if current_time_passed >= next_time_to_print:
next_time_to_print += 5
print(f'worked and {current_time_passed=}')
print(f'{current_time_passed % 5=}')
print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查if语句是我所需要的。在我已经复杂的多处理器多gpu代码中拥有线程,调度器并不是我想要添加的复杂性,如果我可以避免它,似乎我可以。检查worker id很容易确保只有一个进程在做这件事。

注意,我使用True print语句来确保模块化的算术技巧有效,因为检查确切的时间显然是行不通的!但令我惊喜的是,地板竟然起了作用。

间隔计时器可以做到高精度(即<1毫秒),因为它是同步到系统时钟。它不会随着时间的推移而漂移,也不受代码执行时间长度的影响(当然,前提是它小于间隔时间)。

一个简单的阻塞例子:

from interval_timer import IntervalTimer


for interval in IntervalTimer(60):
# Execute code here
...

你可以通过在线程中运行它来轻松地使它非阻塞:

from threading import Thread
from interval_timer import IntervalTimer


def periodic():
for interval in IntervalTimer(60):
# Execute code here
...


thread = Thread(target=periodic)
thread.start()