Comments

在 Flask 项目的 celery 中使用 gevent

在 Flask 项目中使用 Celery 这篇文章谈到了如何在 Flask 项目中集成 Celery,也讲了在 celery 任务中引用 Flask 的 application context 的方法。一般情况下那样使用是没问题的,但是如果需要在 task 中使用 gevent,就需要一些额外的改进。至少有两点。

1. 使用 gevent 并发模型

如果在 task 中要使用 gevent,就必须使用 gevent 并发模型。这很好处理,只需要修改启动选项就行:

$ celery worker -A celery_worker.celery -P gevent -c 10 -l INFO

上面的命令,-P 选项指定 pool,默认是 prefork,这里是 gevent; -c 设置并发数。

2. 引用 Flask 的 application context

这个问题也是在 在 Flask 项目中使用 Celery 中重点讨论的,在这种场景下,上文的解决方法起不到作用,仍然会报错(具体原因不太懂,知道的朋友请不吝赐教)。解决方案就是,把需要引用 Flask app 的地方(如 app.config),放到 Flask 的 application context 里执行,如:

with app.app_context():
    print app.config.get('SOME_CONFIG_KEY')

在实际应用中,我最后写了个装饰器来实现这个目的。简单介绍一下场景,项目用到了 Flask-Cache,项目启动时会创建全局单例 cache,并在 create_app 中进行初始化。在 Flask-Cache 初始化时,会把当前的 Flask app 对象绑定到实例 cache 中,所以可以尝试从这里获取 app 对象。

代码的目录结构与之前一样:

.
├── README.md
├── app
│   ├── __init__.py
│   ├── config.py
│   ├── forms
│   ├── models
│   ├── tasks
│   │   ├── __init__.py
│   │   └── email.py
│   └── views
│   │   ├── __init__.py
│   │   └── account.py
├── celery_worker.py
├── manage.py
└── wsgi.py

装饰器:

def with_app_context(task):
    memo = {'app': None}

    @functools.wraps(task)
    def _wrapper(*args, **kwargs):
        if not memo['app']:
            try:
                # 尝试从 cache 中获取 app 对象,如果得到的不是 None,就不需要重复创建了
                app = cache.app
                _ = app.name
            except Exception:
                from app import create_app

                app = create_app()
            memo['app'] = app
        else:
            app = memo['app']

        # 把 task 放到 application context 环境中运行
        with app.app_context():
            return task(*args, **kwargs)

    return _wrapper

使用:

@celery.task()
@with_app_context
def add(x, y):
    print app.config.get('SOME_CONFIG_KEY')
    return x + y

Comments

comments powered by Disqus