Flask自身并没有异步功能,但是后端遇到些超时操作,前端只能等待,那么Celery就可以实现异步操作,满足开发中的需求。
0x01 Celery
Celery是一个简单、灵活而且可靠的,能够处理大量消息的分布式系统,并且提供维护这样一个系统的必要工具,Celery支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。
Celery的架构有消息中间件(message broker),执行任务单元(worker)和任务执行结果存储(task result store)三个部分组成。
消息中间件
Celery本身不提供消息服务,但是可以方便和第三方提供的消息中间件集成。包括RabbitMQ、Redis等等执行任务单元
Worker是Celery提供的认为执行单元,worker并发运行在分布式的系统节点中。任务结果存储
Task result store用来存储Worker执行的任务结果,Celery支持不同方式存储任务结果,包括AMQP、Redis等等。
Celery统筹三个单元,相互合作完成任务,充分运用了生产者消费者模型,而Wokers可以分布式部署,增加代码执行效率。用户下发任务到消息队列中,而Worker不断监视消息队列的状况,一单任务下发,Worker获得任务,开始工作,完后工作后在把返回的结果存入任务结果存储单元中。
Celery可以配置参数协调任务队列进行任务调度,支持及时任务以及定时任务,自身可以解决任务逻辑,减少程序开销,加快项目开发。
0x02 Flask中使用Celery
在Flask中就可以把耗时任务交给Celery,这样可以优化前端用户体验,就给用户发送邮件功能而说,这就是一个耗时任务,如果不使用Celery的话,同步实现发邮件,前台用户在点击获取邮件后,会在3-4秒后页面才会弹窗,发送邮件成功,这个是个同步运行过程,用户体验不好,并且在这3-4秒钟内,并不知道是否执行发送邮件的命令。使用Celery就不同了,用户点击发送邮件按钮后,会把任务交给Celery来完成,并不需要等待发送邮件的过程。
0x03 实例
说了这么多,很多人还说not give me bb, show me in code,好的,我就发送邮件功能给出个栗子。
- Celery_Example
- config.py
 - tasks.py
 - view.py
 
 
简单的文件结构
config.py:主要写些邮件和Celery的配置文件
1 | # 发送者邮箱的服务器地址 | 
2 | MAIL_SERVER = "smtp.qq.com" | 
3 | MAIL_PORT = '587' | 
4 | MAIL_USE_TLS = True | 
5 | # MAIL_USE_SSL | 
6 | MAIL_USERNAME = 'xxxx@qq.com' | 
7 | MAIL_PASSWORD = 'xxxx' | 
8 | MAIL_DEFAULT_SENDER = 'xxxx@qq.com' | 
9 | |
10 | #celery相关的配置, 中间件使用redis | 
11 | CELERY_RESULT_BACKEND = 'redis://xxxx:6379/x'  #任务结果存储 | 
12 | CELERY_BROKER_URL = 'redis://xxxx:6379/x'  #消息存储 | 
tasks.py: 创建Celery实例绑定app并创建发送邮件任务
1 | # -*- coding: UTF-8 -*- | 
2 | __author__ = 'Joynice' | 
3 | |
4 | from celery import Celery | 
5 | from flask_mail import Message, Mail | 
6 | from flask import Flask | 
7 | import config | 
8 | |
9 | app = Flask(__name__) | 
10 | app.config.from_object(config) | 
11 | mail = Mail() | 
12 | mail.init_app(app) | 
13 | |
14 | def make_celery(app):   # 创建Celery | 
15 |     celery = Celery( | 
16 |         app.import_name, | 
17 |         backend=app.config['CELERY_RESULT_BACKEND'], | 
18 |         broker=app.config['CELERY_BROKER_URL'] | 
19 |     ) | 
20 |     celery.conf.update(app.config) | 
21 | |
22 |     class ContextTask(celery.Task): | 
23 |         def __call__(self, *args, **kwargs): | 
24 |             with app.app_context(): | 
25 |                 return self.run(*args, **kwargs) | 
26 | |
27 |     celery.Task = ContextTask | 
28 |     return celery | 
29 | |
30 | celery = make_celery(app) | 
31 | |
32 |  | 
33 | def send_mail(subject, recipients, body): | 
34 |     message = Message(subject=subject, recipients=recipients, body=body) | 
35 |     mail.send(message) | 
view.py: 在视图函数中使用发送邮件功能
1 | from tasks import send_mail | 
2 | |
3 |  | 
4 |  | 
5 | def email_captcha(): | 
6 |     email = request.args.get('email') | 
7 |     print(email) | 
8 |     if not email: | 
9 |         return restful.params_error('请传递邮箱参数!') | 
10 |     source = list(string.ascii_letters) | 
11 |     source.extend(map(lambda x: str(x), range(0, 10))) | 
12 |     captcha = "".join(random.sample(source, 6)) | 
13 |     send_mail.delay('BBS论坛邮箱验证码', [email], '您的验证码是:{}'.format(captcha))  #Celery处理发送邮件任务 | 
14 |     zlcache.set(email, captcha) | 
15 |     return restful.success() | 
这三部分是功能实现的核心,同时Celery运行命名并不是跟着app.run()运行的,要单独运行:
1 | celery 启动命令:celery -A tasks.celery worker --pool=eventlet, 启动时保证与tasks.py同级 | 
同时记录下遇到的一个坑:
1 | 若出现 'AttributeError: 'float' object has no attribute 'items'错误,这是redis包版本不兼容,退回2.10.6版本即可。 | 
2 | pip install redis==2.10.6 |