使用 Celery 的后台任务

如果你的应用有长时间运行的任务,例如处理一些上传的数据或发送电子邮件,你肯定不希望在请求期间等待它完成。相反,使用任务队列将必要的数据发送到另一个进程,该进程将在后台运行任务,同时请求立即返回。

Celery 是一个强大的任务队列,可以用于简单的后台任务以及复杂的多阶段程序和调度。本指南将向你展示如何使用 Flask 配置 Celery。阅读 Celery 的 First Steps with Celery 指南来学习如何使用 Celery 本身。

Flask 仓库包含一个基于此页面信息的示例,该示例还展示了如何使用 JavaScript 提交任务并轮询进度和结果。

安装

从 PyPI 安装 Celery,例如使用 pip

$ pip install celery

将 Celery 与 Flask 集成

你可以使用 Celery 而无需与 Flask 集成,但通过 Flask 的配置来配置它,并让任务访问 Flask 应用程序会更方便。

Celery 使用与 Flask 类似的思想,使用一个 Celery 应用对象,该对象具有配置并注册任务。在创建 Flask 应用时,使用以下代码来创建和配置 Celery 应用。

from celery import Celery, Task

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

这将创建并返回一个 Celery 应用对象。Celery 配置取自 Flask 配置中的 CELERY 键。Celery 应用被设置为默认应用,以便在每个请求期间可见。Task 子类自动在 Flask 应用上下文激活的情况下运行任务函数,以便数据库连接等服务可用。

这是一个基本的 example.py,它配置 Celery 使用 Redis 进行通信。我们启用了一个结果后端,但默认情况下忽略结果。这允许我们仅为我们关心结果的任务存储结果。

from flask import Flask

app = Flask(__name__)
app.config.from_mapping(
    CELERY=dict(
        broker_url="redis://127.0.0.1",
        result_backend="redis://127.0.0.1",
        task_ignore_result=True,
    ),
)
celery_app = celery_init_app(app)

celery worker 命令指向它,它将找到 celery_app 对象。

$ celery -A example worker --loglevel INFO

你还可以运行 celery beat 命令以按计划运行任务。有关定义计划的更多信息,请参阅 Celery 的文档。

$ celery -A example beat --loglevel INFO

应用工厂

当使用 Flask 应用工厂模式时,在工厂内部调用 celery_init_app 函数。它将 app.extensions["celery"] 设置为 Celery 应用对象,该对象可用于从工厂返回的 Flask 应用中获取 Celery 应用。

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://127.0.0.1",
            result_backend="redis://127.0.0.1",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

要使用 celery 命令,Celery 需要一个应用对象,但这不再直接可用。创建一个 make_celery.py 文件,该文件调用 Flask 应用工厂并从返回的 Flask 应用中获取 Celery 应用。

from example import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

celery 命令指向此文件。

$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO

定义任务

使用 @celery_app.task 装饰器来装饰任务函数需要访问 celery_app 对象,这在使用工厂模式时将不可用。这也意味着装饰后的任务与特定的 Flask 和 Celery 应用实例绑定,如果在测试中更改配置,这可能会成为问题。

相反,使用 Celery 的 @shared_task 装饰器。这将创建任务对象,这些对象将访问“当前应用”,这与 Flask 的蓝图和应用上下文的概念类似。这就是为什么我们在上面调用 celery_app.set_default() 的原因。

这是一个示例任务,它将两个数字相加并返回结果。

from celery import shared_task

@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b

之前,我们将 Celery 配置为默认忽略任务结果。由于我们想知道此任务的返回值,因此我们设置了 ignore_result=False。另一方面,不需要结果的任务(例如发送电子邮件)不会设置此项。

调用任务

装饰后的函数变成一个任务对象,其中包含在后台调用它的方法。最简单的方法是使用 delay(*args, **kwargs) 方法。有关更多方法,请参阅 Celery 的文档。

必须运行 Celery worker 才能运行任务。启动 worker 在前面的章节中已展示。

from flask import request

@app.post("/add")
def start_add() -> dict[str, object]:
    a = request.form.get("a", type=int)
    b = request.form.get("b", type=int)
    result = add_together.delay(a, b)
    return {"result_id": result.id}

路由不会立即获得任务的结果。这将通过阻止响应而达不到目的。相反,我们返回正在运行的任务的结果 ID,我们稍后可以使用它来获取结果。

获取结果

要获取我们上面启动的任务的结果,我们将添加另一个路由,该路由接受我们之前返回的结果 ID。如果任务已完成,我们将返回任务是否完成(就绪)、是否成功完成以及返回值(或错误)。

from celery.result import AsyncResult

@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }

现在你可以使用第一个路由启动任务,然后使用第二个路由轮询结果。这可以防止 Flask 请求 worker 在等待任务完成时被阻塞。

Flask 仓库包含一个使用 JavaScript 提交任务并轮询进度和结果的示例

将数据传递给任务

上面的 “add” 任务接受两个整数作为参数。要将参数传递给任务,Celery 必须将它们序列化为它可以传递给其他进程的格式。因此,不建议传递复杂对象。例如,不可能传递 SQLAlchemy 模型对象,因为该对象可能不可序列化,并且绑定到查询它的会话。

传递在任务中获取或重新创建任何复杂数据所需的最小量数据。考虑一个任务,该任务将在登录用户请求其数据存档时运行。Flask 请求知道登录用户,并具有从数据库查询的用户对象。它是通过为给定 ID 查询数据库获得的,因此任务可以做同样的事情。传递用户的 ID 而不是用户对象。

@shared_task
def generate_user_archive(user_id: str) -> None:
    user = db.session.get(User, user_id)
    ...

generate_user_archive.delay(current_user.id)