Flask自身并没有异步功能,但是后端遇到些超时操作,前端只能等待,那么Celery就可以实现异步操作,满足开发中的需求。
0x01 Celery
Celery是一个简单、灵活而且可靠的,能够处理大量消息的分布式系统,并且提供维护这样一个系统的必要工具,Celery支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。
Celery的架构有消息中间件(message broker),执行任务单元(worker)和任务执行结果存储(task result store)三个部分组成。
消息中间件
Celery本身不提供消息服务,但是可以方便和第三方提供的消息中间件集成。包括RabbitMQ、Redis等等执行任务单元
Worker是Celery提供的认为执行单元,worker并发运行在分布式的系统节点中。任务结果存储
Task result store用来存储Worker执行的任务结果,Celery支持不同方式存储任务结果,包括AMQP、Redis等等。
Celery统筹三个单元,相互合作完成任务,充分运用了生产者消费者模型,而Wokers可以分布式部署,增加代码执行效率。用户下发任务到消息队列中,而Worker不断监视消息队列的状况,一单任务下发,Worker获得任务,开始工作,完后工作后在把返回的结果存入任务结果存储单元中。
Celery可以配置参数协调任务队列进行任务调度,支持及时任务以及定时任务,自身可以解决任务逻辑,减少程序开销,加快项目开发。
0x02 Flask中使用Celery
在Flask中就可以把耗时任务交给Celery,这样可以优化前端用户体验,就给用户发送邮件功能而说,这就是一个耗时任务,如果不使用Celery的话,同步实现发邮件,前台用户在点击获取邮件后,会在3-4秒后页面才会弹窗,发送邮件成功,这个是个同步运行过程,用户体验不好,并且在这3-4秒钟内,并不知道是否执行发送邮件的命令。使用Celery就不同了,用户点击发送邮件按钮后,会把任务交给Celery来完成,并不需要等待发送邮件的过程。
0x03 实例
说了这么多,很多人还说not give me bb, show me in code
,好的,我就发送邮件功能给出个栗子。
- Celery_Example
- config.py
- tasks.py
- view.py
简单的文件结构
config.py:主要写些邮件和Celery的配置文件
1 | # 发送者邮箱的服务器地址 |
2 | MAIL_SERVER = "smtp.qq.com" |
3 | MAIL_PORT = '587' |
4 | MAIL_USE_TLS = True |
5 | # MAIL_USE_SSL |
6 | MAIL_USERNAME = 'xxxx@qq.com' |
7 | MAIL_PASSWORD = 'xxxx' |
8 | MAIL_DEFAULT_SENDER = 'xxxx@qq.com' |
9 | |
10 | #celery相关的配置, 中间件使用redis |
11 | CELERY_RESULT_BACKEND = 'redis://xxxx:6379/x' #任务结果存储 |
12 | CELERY_BROKER_URL = 'redis://xxxx:6379/x' #消息存储 |
tasks.py: 创建Celery实例绑定app并创建发送邮件任务
1 | # -*- coding: UTF-8 -*- |
2 | __author__ = 'Joynice' |
3 | |
4 | from celery import Celery |
5 | from flask_mail import Message, Mail |
6 | from flask import Flask |
7 | import config |
8 | |
9 | app = Flask(__name__) |
10 | app.config.from_object(config) |
11 | mail = Mail() |
12 | mail.init_app(app) |
13 | |
14 | def make_celery(app): # 创建Celery |
15 | celery = Celery( |
16 | app.import_name, |
17 | backend=app.config['CELERY_RESULT_BACKEND'], |
18 | broker=app.config['CELERY_BROKER_URL'] |
19 | ) |
20 | celery.conf.update(app.config) |
21 | |
22 | class ContextTask(celery.Task): |
23 | def __call__(self, *args, **kwargs): |
24 | with app.app_context(): |
25 | return self.run(*args, **kwargs) |
26 | |
27 | celery.Task = ContextTask |
28 | return celery |
29 | |
30 | celery = make_celery(app) |
31 | |
32 |
|
33 | def send_mail(subject, recipients, body): |
34 | message = Message(subject=subject, recipients=recipients, body=body) |
35 | mail.send(message) |
view.py: 在视图函数中使用发送邮件功能
1 | from tasks import send_mail |
2 | |
3 |
|
4 |
|
5 | def email_captcha(): |
6 | email = request.args.get('email') |
7 | print(email) |
8 | if not email: |
9 | return restful.params_error('请传递邮箱参数!') |
10 | source = list(string.ascii_letters) |
11 | source.extend(map(lambda x: str(x), range(0, 10))) |
12 | captcha = "".join(random.sample(source, 6)) |
13 | send_mail.delay('BBS论坛邮箱验证码', [email], '您的验证码是:{}'.format(captcha)) #Celery处理发送邮件任务 |
14 | zlcache.set(email, captcha) |
15 | return restful.success() |
这三部分是功能实现的核心,同时Celery运行命名并不是跟着app.run()
运行的,要单独运行:
1 | celery 启动命令:celery -A tasks.celery worker --pool=eventlet, 启动时保证与tasks.py同级 |
同时记录下遇到的一个坑:
1 | 若出现 'AttributeError: 'float' object has no attribute 'items'错误,这是redis包版本不兼容,退回2.10.6版本即可。 |
2 | pip install redis==2.10.6 |