在 Flask 中执行异步任务

我正在用 Flask 编写一个应用程序,除了 WSGI是同步和阻塞的以外,这个应用程序运行得非常好。我特别有一个调用第三方 API 的任务,这个任务可能需要几分钟才能完成。我希望进行这个调用(实际上是一系列调用) ,然后让它运行。同时把控制权还给 Flask。

我的观点是:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)

现在,我想要的是一条线

final_file = audio_class.render_audio()

运行并提供方法返回时执行的回调,而 Flask 可以继续处理请求。这是唯一的任务,我需要 Flask 异步运行,我想一些建议,如何最好地实现这一点。

我已经看过 Twsted 和 Klein 了,但我不确定它们是否过度,因为也许线程就足够了。或许芹菜是个不错的选择?

156324 次浏览

我将使用 芹菜为您处理异步任务。您需要安装一个代理作为任务队列(推荐使用 RabbitMQ 和 Redis)。

返回文章页面

from flask import Flask
from celery import Celery


broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue


app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py


@celery.task(bind=True)
def some_long_task(self, x, y):
# Do some long task
...


@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
return Response(
mimetype='application/json',
status=200
)

运行 Flask 应用程序,然后启动另一个进程来运行芹菜工人。

$ celery worker -A app.celery --loglevel=debug

我也会参考 Miguel Gringberg 的 写下来来获得一个更深入的指南,指导如何在烧瓶中使用芹菜。

穿线是另一种可能的解决方案。尽管基于芹菜的解决方案更适合于大规模应用程序,但是如果您不希望有问题的端点上有太多的流量,那么线程处理是一个可行的替代方案。

这个解决方案是基于 Miguel Grinberg 的 PyCon 2016规模烧瓶展示,特别是 幻灯片41在他的幻灯片。他的 代码也可以在 github 上获得为那些感兴趣的原始来源。

从用户的角度来看,代码的工作原理如下:

  1. 调用执行长时间运行任务的端点。
  2. 此端点返回202Accepted,并带有检查任务状态的链接。
  3. 当 tak 仍在运行时,对状态链接的调用返回202,当任务完成时返回200(以及结果)。

要将 api 调用转换为后台任务,只需添加@sync _ api 装饰器。

下面是一个完整的例子:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid


tasks = {}


app = Flask(__name__)
api = Api(app)




@app.before_first_request
def before_first_request():
"""Start a background thread that cleans up old tasks."""
def clean_old_tasks():
"""
This function cleans up old tasks from our in-memory data structure.
"""
global tasks
while True:
# Only keep tasks that are running or that finished less than 5
# minutes ago.
five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
tasks = {task_id: task for task_id, task in tasks.items()
if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
time.sleep(60)


if not current_app.config['TESTING']:
thread = threading.Thread(target=clean_old_tasks)
thread.start()




def async_api(wrapped_function):
@wraps(wrapped_function)
def new_function(*args, **kwargs):
def task_call(flask_app, environ):
# Create a request context similar to that of the original request
# so that the task can have access to flask.g, flask.request, etc.
with flask_app.request_context(environ):
try:
tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
except HTTPException as e:
tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
except Exception as e:
# The function raised an exception, so we set a 500 error
tasks[task_id]['return_value'] = InternalServerError()
if current_app.debug:
# We want to find out if something happened so reraise
raise
finally:
# We record the time of the response, to help in garbage
# collecting old tasks
tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())


# close the database session (if any)


# Assign an id to the asynchronous task
task_id = uuid.uuid4().hex


# Record the task, and then launch it
tasks[task_id] = {'task_thread': threading.Thread(
target=task_call, args=(current_app._get_current_object(),
request.environ))}
tasks[task_id]['task_thread'].start()


# Return a 202 response, with a link that the client can use to
# obtain task status
print(url_for('gettaskstatus', task_id=task_id))
return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
return new_function




class GetTaskStatus(Resource):
def get(self, task_id):
"""
Return status about an asynchronous task. If this request returns a 202
status code, it means that task hasn't finished yet. Else, the response
from the task is returned.
"""
task = tasks.get(task_id)
if task is None:
abort(404)
if 'return_value' not in task:
return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
return task['return_value']




class CatchAll(Resource):
@async_api
def get(self, path=''):
# perform some intensive processing
print("starting processing task, path: '%s'" % path)
time.sleep(10)
print("completed processing task, path: '%s'" % path)
return f'The answer is: {path}'




api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')




if __name__ == '__main__':
app.run(debug=True)


您还可以尝试将 multiprocessing.Processdaemon=True一起使用; process.start()方法不会阻塞,您可以在后台执行代价高昂的函数时立即向调用者返回响应/状态。

我在使用 猎鹰框架和使用 daemon过程时遇到了类似的问题。

你需要做以下几件事:

from multiprocessing import Process


@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
heavy_process = Process(  # Create a daemonic process with heavy "my_func"
target=my_func,
daemon=True
)
heavy_process.start()
return Response(
mimetype='application/json',
status=200
)


# Define some heavy function
def my_func():
time.sleep(10)
print("Process finished")

您应该立即得到响应,在10秒之后,您应该在控制台中看到一条打印的消息。

注意: 请记住,daemonic进程不允许产生任何子进程。

烧瓶2.0版

Flask 2.0现在支持异步路由。您可以使用 httpx 库并为此使用异步协程。您可以像下面这样修改代码

@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file =  await asyncio.gather(
audio_class.render_audio(data=text_list),
do_other_stuff_function()
)
# Just make sure that the coroutine should not  having any blocking calls inside it.
return Response(
mimetype='application/json',
status=200
)

上面的代码只是一个伪代码,但是您可以了解使用 flask 2.0时异步是如何工作的,对于 HTTP 调用,您可以使用 httpx。并且还要确保协程只做一些 I/O 任务。

如果使用 redis,则可以使用 Pubsub事件处理后台任务。 更多详情请见: 《 https://redis.com/ebook/part-2-core-concepts/chapter-3-commands-in-redis/3-6-publishsubscribe/》