执行定期行动

我在 Windows 上工作。我想每10秒执行一个函数 foo()
我该怎么做?

229149 次浏览

这将在每次调用 foo()之间插入10秒的睡眠时间,如果调用快速完成,这大约是您所要求的睡眠时间。

import time


while True:
foo()
time.sleep(10)

在后台线程中调用 foo()时执行其他操作

import time
import sys
import threading


def foo():
sys.stdout.write('({}) foo\n'.format(time.ctime()))


def foo_target():
while True:
foo()
time.sleep(10)


t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
print('doing other things...')

如果您打算每10秒钟在一个 python 脚本中运行 foo () ,那么可以在这些行中执行一些操作。

import time


def foo():
print "Howdy"


while True:
foo()
time.sleep(10)

foo()结束时,创建一个 Timer,它在10秒后调用 foo()本身。
因为,Timer创建一个新的 thread来调用 foo()
你可以做其他的事情而不被阻碍。

import time, threading
def foo():
print(time.ctime())
threading.Timer(10, foo).start()


foo()


#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011

下面是使用 Thread 类的一个不错的实现: http://g-off.net/software/a-python-repeatable-threadingtimer-class

下面的代码更快更脏:

from threading import Timer
from time import sleep


def hello():
print "hello, world"
t = Timer(3,hello)
t.start()


t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed


# timer will wake up ever 3 seconds, while we do something else
while True:
print "do something else"
sleep(10)

您可以在不同的线程中执行任务。threading.Timer允许你在一段时间之后执行一次给定的回调,例如,如果你想执行你的任务,只要回调返回 True(这实际上是 glib.timeout_add提供的,但是你可能没有在窗口中安装它)或者直到你取消它,你可以使用这段代码:

import logging, threading, functools
import time


logging.basicConfig(level=logging.NOTSET,
format='%(threadName)s %(message)s')


class PeriodicTimer(object):
def __init__(self, interval, callback):
self.interval = interval


@functools.wraps(callback)
def wrapper(*args, **kwargs):
result = callback(*args, **kwargs)
if result:
self.thread = threading.Timer(self.interval,
self.callback)
self.thread.start()


self.callback = wrapper


def start(self):
self.thread = threading.Timer(self.interval, self.callback)
self.thread.start()


def cancel(self):
self.thread.cancel()




def foo():
logging.info('Doing some work...')
return True


timer = PeriodicTimer(1, foo)
timer.start()


for i in range(2):
time.sleep(2)
logging.info('Doing some other work...')


timer.cancel()

输出示例:

Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...

注意: 回调并不是在每个间隔执行时执行。间隔是线程在上次完成回调和下次调用回调之间等待的时间。

也许 计划模组计划模组能满足你的需要。

或者,考虑使用 计时器对象

简单地睡10秒或使用 threading.Timer(10,foo)将导致开始时间漂移。(你可能不关心这个,或者根据你的具体情况,它可能是一个重要的问题来源。)造成这种情况的原因有两个——线程的唤醒时间不准确或函数的执行时间不准确。

你可以在这篇文章的结尾看到一些结果,但首先是一个如何修复它的例子。您需要跟踪下一次调用函数的时间,而不是实际调用函数的时间,并考虑这种差异。

下面是一个略有偏差的版本:

import datetime, threading


def foo():
print datetime.datetime.now()
threading.Timer(1, foo).start()


foo()

其产出如下:

2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232

你可以看到亚秒计数是不断增加,因此,开始时间是“漂移”。

这是正确解释漂移的代码:

import datetime, threading, time


next_call = time.time()


def foo():
global next_call
print datetime.datetime.now()
next_call = next_call+1
threading.Timer( next_call - time.time(), foo ).start()


foo()

其产出如下:

2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393

在这里你可以看到,不再有任何增加,在次秒的时间。

如果事件确实频繁发生,您可能希望在单个线程中运行计时器,而不是为每个事件启动一个新线程。考虑到漂移因素,这看起来像是:

import datetime, threading, time


def foo():
next_call = time.time()
while True:
print datetime.datetime.now()
next_call = next_call+1;
time.sleep(next_call - time.time())


timerThread = threading.Thread(target=foo)
timerThread.start()

然而,您的应用程序不会正常退出,您需要终止计时器线程。如果希望在应用程序完成后正常退出,而不需要手动终止线程,则应使用

timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()

下面是一个简单的基于单线程睡眠的漂移版本,但是当它检测到漂移时会尝试自动纠正。

注意: 只有满足以下3个合理假设,这种方法才会奏效:

  1. 时间周期远远大于正在执行的函数的执行时间
  2. 正在执行的函数在每个调用上花费的时间大致相同
  3. 调用之间的漂移量小于一秒

-

from datetime import timedelta
from datetime import datetime


def exec_every_n_seconds(n,f):
first_called=datetime.now()
f()
num_calls=1
drift=timedelta()
time_period=timedelta(seconds=n)
while 1:
time.sleep(n-drift.microseconds/1000000.0)
current_time = datetime.now()
f()
num_calls += 1
difference = current_time - first_called
drift = difference - time_period* num_calls
print "drift=",drift

没有找到一个解决方案使用生成器的时间。我只是设计了我自己的目的。

这种解决方案: 单线程,每个周期不实例化对象,使用时间生成器,在定时上坚如磐石,甚至可以达到 time模块的精度(不同于我在栈交换中尝试过的几种解决方案)。

注意: 对于 Python 2.x,将下面的 next(g)替换为 g.next()

import time


def do_every(period,f,*args):
def g_tick():
t = time.time()
while True:
t += period
yield max(t - time.time(),0)
g = g_tick()
while True:
time.sleep(next(g))
f(*args)


def hello(s):
print('hello {} ({:.4f})'.format(s,time.time()))
time.sleep(.3)


do_every(1,hello,'foo')

结果,例如:

hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)

请注意,这个例子包含了一个模拟 CPU 做一些其他事情,每个周期为.3秒。如果你每次都把它改成随机的,那就无所谓了。yield行中的最大值用于保护 sleep不受负数的影响,以防被调用的函数所用的时间超过指定的周期。在这种情况下,它将立即执行,并弥补下一次执行时间上的损失时间。