在前后端分离中,后端就要使用Token来管理用户,Flask在设计API的时候,也要使用Token来代理Session来进行用户管理,这篇文章主要记录如何使用Token管理用户。
0x01 认证方式
基于Token的认证方式非常多,BasicAuth
、OAuth1.0
等非常多认证方式,这里主要使用BasicAuth
进行身份认证,这种认证方式也是非常简单的认证方式,说白了就是用户名 + 密码然后进行Base64加密。这种加密的作用也仅仅是让可信任的用户不太可能在进行网络观测时无意中看到密码, 而不能防止恶意用户。 所以也仅限在一些安全要求不是那么高的场景下使用。
然后在生产环境中,可以使用其他认证方式。
1 | from flask_httpauth import HTTPBasicAuth |
2 | |
3 | auth = HTTPBasicAuth() |
4 | User = namedtuple('User', ['uid', 'scope']) |
5 | |
6 |
|
7 | def verify_password(token, password): |
8 | ''' |
9 | 这里直接传入Token值,即为Username+Password base64加密后的值,格式如下 |
10 | # key=Authorization |
11 | # value =basic base64(username:password) |
12 | ''' |
13 | user_info = verify_auth_token(token) |
14 | if not user_info: |
15 | return False |
16 | else: |
17 | g.user = user_info #在敏感接口,不需要传递查询参数,防止越权访问 |
18 | return True |
19 | |
20 | def verify_auth_token(token): |
21 | s = Serializer(current_app.config['SECRET_KEY']) #传入secrte_key产生一个序列化对象 |
22 | try: |
23 | data = s.loads(token) #进行token反序列化,获取其中信息 |
24 | except BadSignature: #如果返回BadSignature,token无效 |
25 | raise AuthFailed(message='token is invalid') |
26 | except SignatureExpired: #如果返回SignatureExpired,token失效 |
27 | raise AuthFailed(message='token is expired') |
28 | uid = data['uid'] #获取user id |
29 | scope = data['scope'] #获取用户权限作用域 |
30 | # request 视图函数 |
31 | allow = is_in_scope(scope, request.endpoint) #鉴权 传入用户作用域,以及当前访问的路由定于的函数名 |
32 | if not allow: |
33 | raise Forbidden() |
34 | return User(uid, scope) |
35 | |
36 | # 路由保护 |
37 |
|
38 | def person(): |
39 | pass |
0x02 登录
登录就是用户传入用户名密码,进行数据库比对,如果认证成功返回Token的过程
1 | |
2 | def generate_auth_token(uid, scope=None, |
3 | expiration=7200): |
4 | """生成Token""" |
5 | s = Serializer(current_app.config['SECRET_KEY'], |
6 | expires_in=expiration) |
7 | return s.dumps({ |
8 | 'uid': uid, |
9 | 'scope':scope |
10 | }) |
11 | |
12 | class InterfaceLogin(views.MethodView): |
13 | ''' |
14 | 登录 |
15 | ''' |
16 |
|
17 | def post(self, version): |
18 | form = LoginForm().validate_for_api() |
19 | identity = User.verify(form.username.data, form.password.data) #进行用户名密码校验 |
20 | expiration = current_app.config['TOKEN_EXPIRATION'] #获取配置的Token过期时间 |
21 | token = generate_auth_token(identity['uid'], |
22 | identity['scope'], |
23 | expiration).decode('ascii') #生成Token进行解码 |
24 | return PostSuccess(data={'token': token}, message='登录成功') #返回Token信息 |
0x03 用户鉴权
权限实现主要依靠scope
作用域即用户当前的权限作用域是否拥有访问某个路由函数的权限,如当前用户的权限为普通用户,他权限作用域中的可以访问的路由函数只有[user.login, user.persion]这样的话就要解密Token中的uid和scope就可以判断用户是否具有访问权限了
1 | # -*- coding: UTF-8 -*- |
2 | __author__ = 'Joynice' |
3 | |
4 | class Scope: |
5 | allow_api = [] |
6 | |
7 | def __add__(self, other): |
8 | self.allow_api = self.allow_api + other.allow_api |
9 | self.allow_api = list(set(self.allow_api)) |
10 | # 运算符重载 |
11 | return self |
12 | |
13 | class UserScope(Scope): |
14 | allow_api = ['user.login', 'user.register'] |
15 | |
16 | |
17 | class AdminScope(Scope): |
18 | allow_api = ['challenges.challenges'] |
19 | |
20 | def __init__(self): |
21 | self + UserScope() |
22 | print(self.allow_api) |
23 | |
24 | class AdministratorsScope(Scope): |
25 | allow_api = [] |
26 | |
27 | def __init__(self): |
28 | self + AdminScope() |
29 | |
30 | def is_in_scope(scope, endpoint): |
31 | scope = globals()[scope]() |
32 | if endpoint in scope.allow_api: |
33 | return True |
34 | else: |
35 | return False |
同时在用户模型对象中存入相应的权限值
1 | class User(Base): |
2 | ''' |
3 | 用户:用户id、用户名、分数、授权登录时间(第一次使用本平台)、上次登录时间、本地登录时间 |
4 | 第一次在前端使用本平台,会检查下sso中的id是否存在此表,若不存在则新增,代表用户授权使用本项目 |
5 | ''' |
6 | __tablename__ = 't_user' |
7 | id = db.Column(db.String(50), primary_key=True, default=shortuuid.uuid) |
8 | username = db.Column(db.String(20), comment='用户名', unique=True, nullable=False) |
9 | email = db.Column(db.String(255), comment='邮箱', unique=True, nullable=False) |
10 | _password = db.Column('password', db.String(100)) |
11 | auth = db.Column(db.SmallInteger, default=1, comment='用于权限认证,1:普通用户;2:管理员;3:超级管理员') |
12 | |
13 |
|
14 | def verify(username, password): |
15 | print(username, password) |
16 | user = User.query.filter_by(username=username).first_or_404() |
17 | if not user.check_password(password): |
18 | raise AuthFailed() |
19 | if user.auth == 1: |
20 | scope = 'UserScope' |
21 | elif user.auth == 2: |
22 | scope = 'AdminScope' |
23 | elif user.auth == 3: |
24 | scope = 'AdministratorsScope' |
25 | else: |
26 | raise AuthFailed() |
27 | return {'uid': user.id, 'scope': scope} #返回用户权限 |
这样通过is_in_scope
函数就可以进行用户鉴权了。
0x04 总结
这也是我看Flask课程总结下来,修改下,使用到自己的项目中,经过仔细的研究相应的逻辑,基本理清了思路,这里主要粘贴下核心代码,方便以后复用。