定时器-每秒重复一次功能

我希望每0.5秒启动一个函数,并且能够启动、停止和重置计时器。我不太了解 Python 线程是如何工作的,并且在使用 Python 计时器时遇到了困难。

但是,当执行两次 threading.timer.start()时,我仍然得到 RuntimeError: threads can only be started once。有什么办法吗?我试着在每次开始之前应用 threading.timer.cancel()

伪代码:

t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
283121 次浏览

最好的方法是启动计时器线程一次

class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event


def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function

在启动计时器的代码中,可以通过 set停止事件来停止计时器。

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()

来自 Python 中 setInterval 的等价物:

import threading


def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()


def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)


t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator

Usage:

@setInterval(.5)
def function():
"..."


stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()

或者这里是 相同的功能,但作为一个独立的功能,而不是一个装饰:

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()

下面介绍如何在不使用线程的情况下完成此操作。

Using timer threads-

from threading import Timer,Thread,Event




class perpetualTimer():


def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)


def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()


def start(self):
self.thread.start()


def cancel(self):
self.thread.cancel()


def printer():
print 'ipsem lorem'


t = perpetualTimer(5,printer)
t.start()

这可以被 t.cancel()阻止

from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()


TaskManager()

这里是一个小样本,它将有助于更好地理解它是如何运行的。 函数 taskManager ()在最后创建对其自身的延迟函数调用。

尝试改变“ dalay”变量,你将能够看到差异

from threading import Timer, _sleep


# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True


def taskManager():


global counter, DATA, delay, allow_run
counter += 1


if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")


else:
print("["+str(counter)+"] no data")


if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()


else:
print(" END task-manager: disabled")


# ------------------------------------------
def main():


DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)




# ------------------------------------------
print(" START task-manager:")
taskManager()


_sleep(2)
DATA.append("first data")


_sleep(2)
DATA.append("second data")


print(" START main():")
main()
print(" END main():")


_sleep(2)
DATA.append("last data")


allow_run = False

为了按照 OP 的要求使用 Timer 提供正确的答案,我将改进 Swapnil Jariwala 的回答:

from threading import Timer




class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""


def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None


def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()


def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()


def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")


def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")




def tick():
print('ipsem lorem')


# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

为了一个项目,我不得不这么做。我最后做的是为函数启动一个单独的线程

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

心跳是我的函数 worker 是我的参数之一

inside of my heartbeat function:

def heartbeat(worker):


while True:
time.sleep(5)
#all of my code

因此,当我启动线程时,函数将重复等待5秒钟,运行我所有的代码,并且无限期地这样做。如果要终止进程,只需终止线程。

我已经实现了一个作为计时器的类。

我把链接留在这里,以防有人需要: Https://github.com/ivanhalencp/python/tree/master/xtimer

我修改了 swapnil-jariwala 代码中的一些代码,以制作一个小控制台时钟。

from threading import Timer, Thread, Event
from datetime import datetime


class PT():


def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)


def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()


def start(self):
self.thread.start()


def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")




t = PT(1, printer)
t.start()

输出

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

带有 tkinter 图形界面的定时器

这段代码将时钟计时器放在一个带有 tkinter 的小窗口中

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk


app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()




class perpetualTimer():


def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)


def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()


def start(self):
self.thread.start()


def cancel(self):
self.thread.cancel()




def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()




t = perpetualTimer(1, printer)
t.start()
app.mainloop()

抽认卡游戏的一个例子(类似)

from threading import Timer, Thread, Event
from datetime import datetime




class perpetualTimer():


def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)


def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()


def start(self):
self.thread.start()


def cancel(self):
self.thread.cancel()




x = datetime.today()
start = x.second




def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0




def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0




questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]


answers = "Rome", "Paris", "London", "Madrid"


counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

产出:

Get ready to answer
>>>
-What is the capital of Italy?
It is Rome


-What is the capital of France?
It is Paris


-What is the capital of England?
...

Hans Then 的回答上稍作改进,我们可以只子类化 Timer 函数。下面是我们的 完整的“重复计时器”代码,它可以用作线程的下拉替换。带有相同参数的计时器:

from threading import Timer


class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)

用法例子:

def dummyfn(msg="foo"):
print(msg)


timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

产生以下输出:

foo
foo
foo
foo

还有

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

生产

bar
bar
bar
bar

我喜欢右2点击的答案,特别是它不需要一个线程被拆除,并创建一个新的每次计时器滴答。此外,使用定期调用的计时器回调创建类也很容易。这是我通常的用例:

class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)


def on_timer(self):
print("Tick")




if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()

这是一个使用函数而不是类的替代实现。

Because wait is more accurate than sleep ( it takes function runtime into account ):

import threading


PING_ON = threading.Event()


def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))


t = threading.Thread(target=ping)
t.start()


sleep(5)
PING_ON.set()

I have come up with another solution with SingleTon class. Please tell me if any memory leakage is here.

import time,threading


class Singleton:
__instance = None
sleepTime = 1
executeThread = False


def __init__(self):
if Singleton.__instance != None:
raise Exception("This class is a singleton!")
else:
Singleton.__instance = self


@staticmethod
def getInstance():
if Singleton.__instance == None:
Singleton()
return Singleton.__instance




def startThread(self):
self.executeThread = True
self.threadNew = threading.Thread(target=self.foo_target)
self.threadNew.start()
print('doing other things...')




def stopThread(self):
print("Killing Thread ")
self.executeThread = False
self.threadNew.join()
print(self.threadNew)




def foo(self):
print("Hello in " + str(self.sleepTime) + " seconds")




def foo_target(self):
while self.executeThread:
self.foo()
print(self.threadNew)
time.sleep(self.sleepTime)


if not self.executeThread:
break




sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()


sClass.getInstance().sleepTime = 2
sClass.startThread()

除了上述使用 Threads 的绝佳答案之外,如果您必须使用主线程或者更喜欢异步方法,我还在 Aio _ timer Timer 类周围包装了一个简短的类(以启用重复)

import asyncio
from aio_timers import Timer


class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
    

def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
    

def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()




from time import time
def cb(timer_name):
print(timer_name, time())


print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

时钟开始于: 1602438840.9690785

Timer _ 11602438845.980087

timer_2 1602438850.9806316

Timer _ 11602438850.9808934

timer_1 1602438855.9863033

Timer _ 21602438860.9868324

Timer _ 11602438860.9876585

这是连续运行计时器的示例代码。只需在用尽时创建一个新的计时器并调用相同的函数。这不是最好的方法,但也可以这样做。

import threading
import time




class ContinousTimer():
def __init__(self):
self.timer = None


def run(self, msg='abc'):
print(msg)


self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
self.timer.start()




if __name__ == "__main__":
t = ContinousTimer()
try:
t.run(msg="Hello")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
# Cancel Timer
t.timer.cancel()

我有点迟到了,但我的建议是:

您可以通过重复调用 threading.Timer对象的 .run()方法来重用它,如下所示:

class SomeClassThatNeedsATimer:
def __init__(...):
self.timer = threading.Timer(interval, self.on_timer)
self.timer.start()


def on_timer(self):
print('On timer')
self.timer.run()