使用 Celery 的后台任务¶
如果你的应用有长时间运行的任务,例如处理一些上传的数据或发送电子邮件,你肯定不希望在请求期间等待它完成。相反,使用任务队列将必要的数据发送到另一个进程,该进程将在后台运行任务,同时请求立即返回。
Celery 是一个强大的任务队列,可以用于简单的后台任务以及复杂的多阶段程序和调度。本指南将向你展示如何使用 Flask 配置 Celery。阅读 Celery 的 First Steps with Celery 指南来学习如何使用 Celery 本身。
Flask 仓库包含一个基于此页面信息的示例,该示例还展示了如何使用 JavaScript 提交任务并轮询进度和结果。
安装¶
从 PyPI 安装 Celery,例如使用 pip
$ pip install celery
将 Celery 与 Flask 集成¶
你可以使用 Celery 而无需与 Flask 集成,但通过 Flask 的配置来配置它,并让任务访问 Flask 应用程序会更方便。
Celery 使用与 Flask 类似的思想,使用一个 Celery
应用对象,该对象具有配置并注册任务。在创建 Flask 应用时,使用以下代码来创建和配置 Celery 应用。
from celery import Celery, Task
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
这将创建并返回一个 Celery
应用对象。Celery 配置取自 Flask 配置中的 CELERY
键。Celery 应用被设置为默认应用,以便在每个请求期间可见。Task
子类自动在 Flask 应用上下文激活的情况下运行任务函数,以便数据库连接等服务可用。
这是一个基本的 example.py
,它配置 Celery 使用 Redis 进行通信。我们启用了一个结果后端,但默认情况下忽略结果。这允许我们仅为我们关心结果的任务存储结果。
from flask import Flask
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://127.0.0.1",
result_backend="redis://127.0.0.1",
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)
将 celery worker
命令指向它,它将找到 celery_app
对象。
$ celery -A example worker --loglevel INFO
你还可以运行 celery beat
命令以按计划运行任务。有关定义计划的更多信息,请参阅 Celery 的文档。
$ celery -A example beat --loglevel INFO
应用工厂¶
当使用 Flask 应用工厂模式时,在工厂内部调用 celery_init_app
函数。它将 app.extensions["celery"]
设置为 Celery 应用对象,该对象可用于从工厂返回的 Flask 应用中获取 Celery 应用。
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://127.0.0.1",
result_backend="redis://127.0.0.1",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
要使用 celery
命令,Celery 需要一个应用对象,但这不再直接可用。创建一个 make_celery.py
文件,该文件调用 Flask 应用工厂并从返回的 Flask 应用中获取 Celery 应用。
from example import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
将 celery
命令指向此文件。
$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO
定义任务¶
使用 @celery_app.task
装饰器来装饰任务函数需要访问 celery_app
对象,这在使用工厂模式时将不可用。这也意味着装饰后的任务与特定的 Flask 和 Celery 应用实例绑定,如果在测试中更改配置,这可能会成为问题。
相反,使用 Celery 的 @shared_task
装饰器。这将创建任务对象,这些对象将访问“当前应用”,这与 Flask 的蓝图和应用上下文的概念类似。这就是为什么我们在上面调用 celery_app.set_default()
的原因。
这是一个示例任务,它将两个数字相加并返回结果。
from celery import shared_task
@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
return a + b
之前,我们将 Celery 配置为默认忽略任务结果。由于我们想知道此任务的返回值,因此我们设置了 ignore_result=False
。另一方面,不需要结果的任务(例如发送电子邮件)不会设置此项。
调用任务¶
装饰后的函数变成一个任务对象,其中包含在后台调用它的方法。最简单的方法是使用 delay(*args, **kwargs)
方法。有关更多方法,请参阅 Celery 的文档。
必须运行 Celery worker 才能运行任务。启动 worker 在前面的章节中已展示。
from flask import request
@app.post("/add")
def start_add() -> dict[str, object]:
a = request.form.get("a", type=int)
b = request.form.get("b", type=int)
result = add_together.delay(a, b)
return {"result_id": result.id}
路由不会立即获得任务的结果。这将通过阻止响应而达不到目的。相反,我们返回正在运行的任务的结果 ID,我们稍后可以使用它来获取结果。
获取结果¶
要获取我们上面启动的任务的结果,我们将添加另一个路由,该路由接受我们之前返回的结果 ID。如果任务已完成,我们将返回任务是否完成(就绪)、是否成功完成以及返回值(或错误)。
from celery.result import AsyncResult
@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
result = AsyncResult(id)
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result if result.ready() else None,
}
现在你可以使用第一个路由启动任务,然后使用第二个路由轮询结果。这可以防止 Flask 请求 worker 在等待任务完成时被阻塞。
Flask 仓库包含一个使用 JavaScript 提交任务并轮询进度和结果的示例。
将数据传递给任务¶
上面的 “add” 任务接受两个整数作为参数。要将参数传递给任务,Celery 必须将它们序列化为它可以传递给其他进程的格式。因此,不建议传递复杂对象。例如,不可能传递 SQLAlchemy 模型对象,因为该对象可能不可序列化,并且绑定到查询它的会话。
传递在任务中获取或重新创建任何复杂数据所需的最小量数据。考虑一个任务,该任务将在登录用户请求其数据存档时运行。Flask 请求知道登录用户,并具有从数据库查询的用户对象。它是通过为给定 ID 查询数据库获得的,因此任务可以做同样的事情。传递用户的 ID 而不是用户对象。
@shared_task
def generate_user_archive(user_id: str) -> None:
user = db.session.get(User, user_id)
...
generate_user_archive.delay(current_user.id)