Flask中使用celery实现异步操作

Flask自身并没有异步功能,但是后端遇到些超时操作,前端只能等待,那么Celery就可以实现异步操作,满足开发中的需求。

0x01 Celery

Celery是一个简单、灵活而且可靠的,能够处理大量消息的分布式系统,并且提供维护这样一个系统的必要工具,Celery支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。

Celery的架构有消息中间件(message broker),执行任务单元(worker)和任务执行结果存储(task result store)三个部分组成。

  1. 消息中间件
    Celery本身不提供消息服务,但是可以方便和第三方提供的消息中间件集成。包括RabbitMQ、Redis等等

  2. 执行任务单元
    Worker是Celery提供的认为执行单元,worker并发运行在分布式的系统节点中。

  3. 任务结果存储
    Task result store用来存储Worker执行的任务结果,Celery支持不同方式存储任务结果,包括AMQP、Redis等等。
    Celery统筹三个单元,相互合作完成任务,充分运用了生产者消费者模型,而Wokers可以分布式部署,增加代码执行效率。用户下发任务到消息队列中,而Worker不断监视消息队列的状况,一单任务下发,Worker获得任务,开始工作,完后工作后在把返回的结果存入任务结果存储单元中。

Celery可以配置参数协调任务队列进行任务调度,支持及时任务以及定时任务,自身可以解决任务逻辑,减少程序开销,加快项目开发。

0x02 Flask中使用Celery

在Flask中就可以把耗时任务交给Celery,这样可以优化前端用户体验,就给用户发送邮件功能而说,这就是一个耗时任务,如果不使用Celery的话,同步实现发邮件,前台用户在点击获取邮件后,会在3-4秒后页面才会弹窗,发送邮件成功,这个是个同步运行过程,用户体验不好,并且在这3-4秒钟内,并不知道是否执行发送邮件的命令。使用Celery就不同了,用户点击发送邮件按钮后,会把任务交给Celery来完成,并不需要等待发送邮件的过程。

0x03 实例

说了这么多,很多人还说not give me bb, show me in code,好的,我就发送邮件功能给出个栗子。

  • Celery_Example
    • config.py
    • tasks.py
    • view.py

简单的文件结构

config.py:主要写些邮件和Celery的配置文件

1
# 发送者邮箱的服务器地址
2
MAIL_SERVER = "smtp.qq.com"
3
MAIL_PORT = '587'
4
MAIL_USE_TLS = True
5
# MAIL_USE_SSL
6
MAIL_USERNAME = 'xxxx@qq.com'
7
MAIL_PASSWORD = 'xxxx'
8
MAIL_DEFAULT_SENDER = 'xxxx@qq.com'
9
10
#celery相关的配置, 中间件使用redis
11
CELERY_RESULT_BACKEND = 'redis://xxxx:6379/x'  #任务结果存储
12
CELERY_BROKER_URL = 'redis://xxxx:6379/x'  #消息存储

tasks.py: 创建Celery实例绑定app并创建发送邮件任务

1
# -*- coding: UTF-8 -*-
2
__author__ = 'Joynice'
3
4
from celery import Celery
5
from flask_mail import Message, Mail
6
from flask import Flask
7
import config
8
9
app = Flask(__name__)
10
app.config.from_object(config)
11
mail = Mail()
12
mail.init_app(app)
13
14
def make_celery(app):   # 创建Celery
15
    celery = Celery(
16
        app.import_name,
17
        backend=app.config['CELERY_RESULT_BACKEND'],
18
        broker=app.config['CELERY_BROKER_URL']
19
    )
20
    celery.conf.update(app.config)
21
22
    class ContextTask(celery.Task):
23
        def __call__(self, *args, **kwargs):
24
            with app.app_context():
25
                return self.run(*args, **kwargs)
26
27
    celery.Task = ContextTask
28
    return celery
29
30
celery = make_celery(app)
31
32
@celery.task    #创建发送邮件任务
33
def send_mail(subject, recipients, body):
34
    message = Message(subject=subject, recipients=recipients, body=body)
35
    mail.send(message)

view.py: 在视图函数中使用发送邮件功能

1
from tasks import send_mail
2
3
@bp.route('/email_captcha/')
4
@login_required
5
def email_captcha():
6
    email = request.args.get('email')
7
    print(email)
8
    if not email:
9
        return restful.params_error('请传递邮箱参数!')
10
    source = list(string.ascii_letters)
11
    source.extend(map(lambda x: str(x), range(0, 10)))
12
    captcha = "".join(random.sample(source, 6))
13
    send_mail.delay('BBS论坛邮箱验证码', [email], '您的验证码是:{}'.format(captcha))  #Celery处理发送邮件任务
14
    zlcache.set(email, captcha)
15
    return restful.success()

这三部分是功能实现的核心,同时Celery运行命名并不是跟着app.run()运行的,要单独运行:

1
celery 启动命令:celery -A tasks.celery worker --pool=eventlet, 启动时保证与tasks.py同级

同时记录下遇到的一个坑:

1
若出现 'AttributeError: 'float' object has no attribute 'items'错误,这是redis包版本不兼容,退回2.10.6版本即可。
2
pip install redis==2.10.6