如何安排一个函数在 Flask 上每小时运行一次?

我有一个烧瓶网络托管没有访问 cron命令。

如何每小时执行一些 Python 函数?

204256 次浏览

您可以在 Flask 应用程序中使用 APScheduler,并通过它的接口运行作业:

import atexit


# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask


app = Flask(__name__)


cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()


@cron.interval_schedule(hours=1)
def job_function():
# Do your work here




# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))


if __name__ == '__main__':
app.run()

您可能希望对调度程序(如 RQ 调度器)或更重的程序(如芹菜)使用某种队列机制(最有可能是过度使用)。

您可以使用 APScheduler软件包(v3.5.3)中的 BackgroundScheduler():

import time
import atexit


from apscheduler.schedulers.background import BackgroundScheduler




def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))




scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=60)
scheduler.start()


# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

请注意,当 Flask 处于调试模式时,将启动其中两个调度程序。要了解更多信息,请查看 这个问题。

另一个选择可能是使用 Flask-APScheduler,它可以很好地与 Flask 一起工作,例如:

  • 从 Flask 配置加载调度程序配置,
  • 从 Flask 配置加载作业定义

更多资料点击这里:

Https://pypi.python.org/pypi/flask-apscheduler

您可以尝试使用 后台调度程序将间隔作业集成到 Flask 应用程序中。下面是使用蓝图和应用程序工厂(Init.py)的示例:

from datetime import datetime


# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask


from webapp.models.main import db
from webapp.controllers.main import main_blueprint


# define the job
def hello_job():
print('Hello Job! The time is: %s' % datetime.now())


def create_app(object_name):
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
app.register_blueprint(main_blueprint)
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=3)
scheduler.start()


try:
# To keep the main thread alive
return app
except:
# shutdown if app occurs except
scheduler.shutdown()

希望对你有所帮助:)

参考:

  1. Https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

我对应用程序调度程序的概念还有点新,但是我在这里为 APScheduler v3.3.1找到的是一些不同的东西。我相信对于最新的版本来说,包的结构,类名等等都已经改变了,所以我在这里提出了一个新的解决方案,这是我最近提出的,集成了一个基本的 Flask 应用程序:

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """


from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask


def sensor():
""" Function for test purposes. """
print("Scheduler is alive!")


sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()


app = Flask(__name__)


@app.route("/home")
def home():
""" Function for test purposes. """
return "Welcome Home :) !"


if __name__ == "__main__":
app.run()

如果有人对这个示例的更新感兴趣,我也将留下这个 Gist 给你

以下是一些参考资料,以供将来阅读:

一个使用调度和多处理的完整示例,带有 on 和 off 控制和 run _ job ()参数 返回码简化,间隔设置为10秒,改为 every(2).hour.do()为2小时。时间表是相当令人印象深刻的它不漂移,我从来没有看到它超过100毫秒时间表。使用多处理代替线程处理,因为它有一个终止方法。

#!/usr/bin/env python3


import schedule
import time
import datetime
import uuid


from flask import Flask, request
from multiprocessing import Process


app = Flask(__name__)
t = None
job_timer = None


def run_job(id):
""" sample job with parameter """
global job_timer
print("timer job id={}".format(id))
print("timer: {:.4f}sec".format(time.time() - job_timer))
job_timer = time.time()


def run_schedule():
""" infinite loop for schedule """
global job_timer
job_timer = time.time()
while 1:
schedule.run_pending()
time.sleep(1)


@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
global t, job_timer
if status=='on' and not t:
schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
t = Process(target=run_schedule)
t.start()
return "timer on with interval:{}sec\n".format(nsec)
elif status=='off' and t:
if t:
t.terminate()
t = None
schedule.clear()
return "timer off\n"
return "timer status not changed\n"


if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)

您可以通过发出以下命令来测试:

$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed

每隔10秒计时器就会向控制台发出一条计时器消息:

127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec

对于简单的解决方案,可以添加如下路由

@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
logging.info("Did the thing")
return "OK", 200

然后 添加一个 unix cron 作业定期发送到这个端点。例如,为了每分钟运行一次,在终端键入 crontab -e并添加以下代码行:

* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing

(注意 curl 的路径必须是完整的,因为当作业运行时,它不会包含您的 PATH。你可以通过 which curl找到你的系统上卷曲的完整路径)

我喜欢这个,因为它很容易手动测试的工作,它没有额外的依赖关系,因为没有任何特殊的事情发生,这是很容易理解的。

保安

如果你想要密码保护你的 cron 作业,你可以 pip install Flask-BasicAuth,然后添加凭证到你的应用程序配置:

app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'

密码保护作业端点:

from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)


@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
logging.info("Did the thing a bit more securely")
return "OK", 200

然后从你的老古董工作中称之为:

* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

我已经尝试使用烧瓶,而不是一个简单的调度程序,你需要安装的是

Pip3安装 flask _ apScheder

下面是我的代码示例:

from flask import Flask
from flask_apscheduler import APScheduler


app = Flask(__name__)
scheduler = APScheduler()


def scheduleTask():
print("This test runs every 3 seconds")


if __name__ == '__main__':
scheduler.add_job(id = 'Scheduled Task', func=scheduleTask, trigger="interval", seconds=3)
scheduler.start()
app.run(host="0.0.0.0")

您可以使用 烧瓶-酒瓶模块,这是相当容易的。

步骤1: pip install flask-crontab

第二步:

from flask import Flask
from flask_crontab import Crontab


app = Flask(__name__)
crontab = Crontab(app)

第三步:

@crontab.job(minute="0", hour="6", day="*", month="*", day_of_week="*")
def my_scheduled_job():
do_something()

第四步: 在 cmd,点击

flask crontab add

完成。现在只需运行您的烧瓶应用程序,您可以检查您的函数将在每天6:00调用。

你可以参考 给你(正式文件)。