本文主要参考国外大佬的文章教你如何使用JWT框架实现Flask API的身份认证。
JSON Web Token(JWT)是一个非常流行的WEB认证认证框架,JWT可以说是SPA和web服务器之间授权和通信的标准做法。在本文中,我们会构建一个具有JWT授权的Flask web服务器。
完整的代码从这里下载:https://github.com/dickens88/flask-jwt-demo
步骤1 工程构建
本工程基于Python3.6或以上版本,先执行pip
安装依赖包:
pip install flask flask-restful flask-jwt-extended passlib flask-sqlalchemy
新建一个Python工程(你可以选择pycharm作为开发IDE工具),我们将会用到以下几个工具:
- Flask-RESTful — 创建API端点
- Flask-JWT-Extended — 生成和校验JWT
- passlib — 生成密码的摘要值
- Flask-SQLAlchemy — 数据库对象的ORM映射
为了简化这个例子,我们会将工程目录扁平化,工程一共有4个文件,结构如下:
让我们确认一下flask是否可以正常工作,在工程中新建flask-jwt ├── views.py # views of the server ├── models.py # database models ├── resources.py # API endpoints └── run.py # main script to start the server
run.py
文件,加入如下代码:
然后再from flask import Flask app = Flask(__name__) import views, models, resources
view.py
中加入:
这是一个简单的例子,from run import app from flask import jsonify @app.route('/') def index(): return jsonify({'message': 'Hello, World!'})
models.py
和resources.py
文件目前还是空的,在命令行执行如下命令启动Flask应用
你会看到类似如下语句回显:set FLASK_DEBUG=1 set FLASK_APP=run.py flask run
`
- Serving Flask app “run”
- Forcing debug mode on
- Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
- Restarting with stat
- Debugger is active!
`
用浏览器访问localhost:5000
你将或看到一个JSON的“Hello World”
响应。为了更好的测试我们的应用,建议使用 Postman进行后续的测试。
步骤2 添加API接口
下一步将要构建API访问的接口,下面的代码先不实现具体的业务逻辑,而是先尝试理解整个应用的逻辑结构。
在resources.py
加入下面的代码:
from flask_restful import Resource
class UserRegistration(Resource):
def post(self):
return {'message': 'User registration'}
class UserLogin(Resource):
def post(self):
return {'message': 'User login'}
class UserLogoutAccess(Resource):
def post(self):
return {'message': 'User logout'}
class UserLogoutRefresh(Resource):
def post(self):
return {'message': 'User logout'}
class TokenRefresh(Resource):
def post(self):
return {'message': 'Token refresh'}
class AllUsers(Resource):
def get(self):
return {'message': 'List of users'}
def delete(self):
return {'message': 'Delete all users'}
class SecretResource(Resource):
def get(self):
return {
'answer': 42
}
在最上面,我们从Resource
包引入Flask-RESTful,这个包提供了一种更优雅的方式处理RESTful API,在RESTful API中每个接口都被称作资源
,上面的代码中每个类都继承自Resource
也就拥有了API端点的全部特性。
这里我们创建了7个resource
- 用户注册和登录
- access token和refresh token的登出
- token 刷新
- 获取已注册用户列表 (仅用于测试目的)
- 获取数据接口 (用来测试access token)
除了最后2个接口以外其他接口都接受POST方法,AllUsers
接受GET和DELETE,Secret
只接受GET。如果你熟悉Flask API的话,你知道服务器所有返回内容都应该被jsonify()
包装, 而Flask-RESTful
已经帮你实现了这个逻辑,所以你不需要显示调用jsonify。
下一步需要把我们的接口注册到应用,打开run.py
添加代码:
from flask import Flask
from flask_restful import Api
app = Flask(__name__)
api = Api(app)
import views, models, resources
api.add_resource(resources.UserRegistration, '/registration')
api.add_resource(resources.UserLogin, '/login')
api.add_resource(resources.UserLogoutAccess, '/logout/access')
api.add_resource(resources.UserLogoutRefresh, '/logout/refresh')
api.add_resource(resources.TokenRefresh, '/token/refresh')
api.add_resource(resources.AllUsers, '/users')
api.add_resource(resources.SecretResource, '/secret')
这里我们引入flask-restful的Api类来初始化API接口,并给他们绑定一个uri路径。尝试使用POST请求发送到localhost:5000/registration,你将会收到一个 ‘User registration’回应。
接下来我们给POST添加参数解析逻辑,Flask-RESTful内建有参数解析器,添加如下代码到resources.py
from flask_restful import Resource, reqparse
parser = reqparse.RequestParser()
parser.add_argument('username', help = 'This field cannot be blank', required = True)
parser.add_argument('password', help = 'This field cannot be blank', required = True)
我们从 flask-restful
引入reqparse
,我们首先用reqparse.RequestParser()
初始化parser,然后给他添加参数username
and password
并制定这两个参数为必选(参考reqparse文档)
把parser加入到相关函数:
class UserRegistration(Resource):
def post(self):
data = parser.parse_args()
return data
class UserLogin(Resource):
def post(self):
data = parser.parse_args()
return data
现在/registration和/login接口的所有调用都被要求提供那两个参数,你可以用postman尝试加参数域不加参数的变化。
步骤3 注册和登录
这部分我们将添加数据库支持,并模拟用户注册和登录。我们用SQLAlchemy包提供的方法连接SQLite数据库(无需安装),如果要迁移到MySQL或者其他实体数据库,代码需要做微调。
在run.py
中添加如下代码:
from flask_sqlalchemy import SQLAlchemy
...
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'some-secret-string'
db = SQLAlchemy(app)
@app.before_first_request
def create_tables():
db.create_all()
在最上面引入SQLAlchem包,在app初始化完成后,添加配置SQLAlchemy和创建数据库对象的逻辑,db.create_all()
方法将自动创建所需的数据库表。接下来配置用户数据模型,在models.py
中添加:
from run import db
class UserModel(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key = True)
username = db.Column(db.String(120), unique = True, nullable = False)
password = db.Column(db.String(120), nullable = False)
def save_to_db(self):
db.session.add(self)
db.session.commit()
在最上面从数据库连接处添加db对象,然后声明UserModel类,这个类将会映射成数据库的表,这里我们制定id作为主键,其他成员有username和password,然后添加一个save_to_db()方法用来提交数据给数据库。
打开resource.py
修改UserRegistration
from models import UserModel
class UserRegistration(Resource):
def post(self):
data = parser.parse_args()
new_user = UserModel(
username = data['username'],
password = data['password']
)
try:
new_user.save_to_db()
return {
'message': 'User {} was created'.format( data['username'])
}
except:
return {'message': 'Something went wrong'}, 500
首先我们引入UserModel,当我们收到一个请求时将会创建一个UserModel对象,然后尝试调save_to_db()保存用户信息到数据库,如果报错则返回500状态码。
试试发一个请求给/registration接口:
同样的请求如何你再发一次,就会看到 ‘Something went wrong’的报错,这是因为数据库已经有一个相同的用户名,因此我们要加上用户名的判断逻辑,到models.py
@classmethod
def find_by_username(cls, username):
return cls.query.filter_by(username = username).first()
这个方法会根据用户名查找并返回用户信息,修改resources.py
class UserRegistration(Resource):
def post(self):
data = parser.parse_args()
if UserModel.find_by_username(data['username']):
return {'message': 'User {} already exists'. format(data['username'])}
new_user = ...
这里我们会根据用户注册请求进行判断,如果数据库已经存在同名的用户,则会返回提示信息。
接下来我们来实现UserLogin
接口:
class UserLogin(Resource):
def post(self):
data = parser.parse_args()
current_user = UserModel.find_by_username(data['username'])
if not current_user:
return {'message': 'User {} doesn\'t exist'.format(data['username'])}
if data['password'] == current_user.password:
return {'message': 'Logged in as {}'.format(current_user.username)}
else:
return {'message': 'Wrong credentials'}
这个接口做了这么几件事:
- 首先,解析请求参数
- 然后查看username
- 如果用户名不存在则返回错误信息
- 如果用户存在则检查密码
- 如果密码匹配则返回登录成功,否则失败
用Postman将之前注册的用户信息发送一个登录的POST请求给/login
之前添加的接口列表有一个方法叫AllUsers,我们可以用他测试一下user表里都有什么数据,在resources.py
中:
class AllUsers(Resource):
def get(self):
return UserModel.return_all()
def delete(self):
return UserModel.delete_all()
当GET请求/users接口时,会返回所有已注册用户的列表,如果使用DELETE方法,则会清空表中的数据,修改models.py
中return_all()和delete_all()方法
@classmethod
def return_all(cls):
def to_json(x):
return {
'username': x.username,
'password': x.password
}
return {'users': list(map(lambda x: to_json(x), UserModel.query.all()))}
@classmethod
def delete_all(cls):
try:
num_rows_deleted = db.session.query(cls).delete()
db.session.commit()
return {'message': '{} row(s) deleted'.format(num_rows_deleted)}
except:
return {'message': 'Something went wrong'}
到这里注册和登录的逻辑基本上完成了,但是这里有一个问题-我们存储了用户的原始密码,如果有人未授权访问到这个用户表,那么用户的敏感信息将会泄露,所以遵循业界通用的方法,要对密码进行加密。修改models.py
from passlib.hash import pbkdf2_sha256 as sha256
class UserModel(db.Model):
...
@staticmethod
def generate_hash(password):
return sha256.hash(password)
@staticmethod
def verify_hash(password, hash):
return sha256.verify(password, hash)
generate_hash()
将会生成一个摘要值并存在数据库里,verify_hash()
用来校验给定的密码的hash。修改resources.py
:
new_user = UserModel(
username = data['username'],
password = UserModel.generate_hash(data['password'])
)
在UserLogin里添加校验密码:
if UserModel.verify_hash(data['password'], current_user.password):
...
else:
...
现在用户的密码将以hash的方式进行存储和校验。
步骤4 天机JWT
这个部分我们将添加JSON Web Token到应用中,修改run.py
from flask_jwt_extended import JWTManager
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
jwt = JWTManager(app)
添加JWTManager的引用并初始化,然后修改resources.py
:
from flask_jwt_extended import (create_access_token, create_refresh_token, jwt_required, get_jwt_identity, get_jwt)
先添加所有必须的引用,然后修改UserRegistration
和UserLogin
,当用户认证成功以后返回token:
class UserRegistration(Resource):
def post(self):
data = parser.parse_args()
if UserModel.find_by_username(data['username']):
return {'message': 'User {} already exists'.format(data['username'])}
new_user = UserModel(
username = data['username'],
password = UserModel.generate_hash(data['password'])
)
try:
new_user.save_to_db()
access_token = create_access_token(identity = data['username'])
refresh_token = create_refresh_token(identity = data['username'])
return {
'message': 'User {} was created'.format(data['username']),
'access_token': access_token,
'refresh_token': refresh_token
}
except:
return {'message': 'Something went wrong'}, 500
class UserLogin(Resource):
def post(self):
data = parser.parse_args()
current_user = UserModel.find_by_username(data['username'])
if not current_user:
return {'message': 'User {} doesn\'t exist'.format(data['username'])}
if UserModel.verify_hash(data['password'], current_user.password):
access_token = create_access_token(identity = data['username'])
refresh_token = create_refresh_token(identity = data['username'])
return {
'message': 'Logged in as {}'.format(current_user.username),
'access_token': access_token,
'refresh_token': refresh_token
}
else:
return {'message': 'Wrong credentials'}
所以,当用户成功注册或登录,我们会受到两个token: access token和 refresh token。 我们用create_access_token()
和 create_refresh_token()
来生成token,这两个函数至少要提供identity
参数,简单起见这里可以只将用户名作为identity
但也可以使用更复杂的设计。Access token用于访问被保护的敏感信息,Refresh token用户重新颁发过期的access token
创建一个被保护的资源,我们要用到@jwt_required
装饰器:
class SecretResource(Resource):
@jwt_required()
def get(self):
return {
'answer': 42
}
现在,要访问这个资源你必须添加一个header到请求中,格式是Authorization: Bearer
token需要有一个过期时间,默认情况下access token有15分钟的有效期,refresh tokens是30天,为了让用户不要太频繁的登录,你可以用refresh token来刷新access token,这通常要做一个额外的接口:
class TokenRefresh(Resource):
@jwt_required(refresh=True)
def post(self):
current_user = get_jwt_identity()
access_token = create_access_token(identity=current_user)
return {'access_token': access_token}
这个接口用 jwt_required(refresh=True) 装饰器, 意味着你可以用refresh token来访问。为了确定用户身份,我们使用get_jwt_identity()从refresh token中抽取身份信息,然后使用这个身份生成新的access token并返回给用户。
到这里整个JWT的认证机制就讲完了。
步骤5 登出和注销
当用户要登出的时候,我们不能只删除客户端的token,因为在服务器上token依然有效(直到过期),因此登出意味着我们要讲这个token加到黑名单里,然后每一个请求带的token我们都要和这个黑名单比对,如果匹配上了则不允许进入,下面是一个最简单的登录实现,在models.py
中添加:
class RevokedTokenModel(db.Model):
__tablename__ = 'revoked_tokens'
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(120))
def add(self):
db.session.add(self)
db.session.commit()
@classmethod
def is_jti_blacklisted(cls, jti):
query = cls.query.filter_by(jti=jti).first()
return bool(query)
这个简单的模型只存储主键id和jti-token的唯一标识,s_jti_blacklisted() 检查这个token是否已被撤销。修改run.py
app.config['JWT_BLOCKLIST_TOKEN_CHECKS'] = ['access', 'refresh']
@jwt.token_in_blocklist_loader
def check_if_token_in_blacklist(jwt_header, decrypted_token):
jti = decrypted_token['jti']
return models.RevokedTokenModel.is_jti_blacklisted(jti)
token_in_blocklist_loader
装饰器是一个回调函数,每次客户端请求被保护的接口时都会调用这个函数,函数要根据token是否在blocklist返回True或False,修改resources.py
:
class UserLogoutAccess(Resource):
@jwt_required()
def post(self):
jti = get_jwt()['jti']
try:
revoked_token = RevokedTokenModel(jti=jti)
revoked_token.add()
return {'message': 'Access token has been revoked'}
except:
return {'message': 'Something went wrong'}, 500
class UserLogoutRefresh(Resource):
@jwt_required(refresh=True)
def post(self):
jti = get_jwt()['jti']
try:
revoked_token = RevokedTokenModel(jti=jti)
revoked_token.add()
return {'message': 'Refresh token has been revoked'}
except:
return {'message': 'Something went wrong'}, 500
结束!