用户登录

微信平台

lib目录下的,封装wxauth.py代码。

import requests
from .WXBizDataCrypt import WXBizDataCrypt

WXAPP_ID = 'wxdab2c9193cb7e2a9'

WXAPP_SECRET = '********'


def get_wxapp_session_key(code):
    url = 'https://api.weixin.qq.com/sns/jscode2session?appid=%s&secret=%s&js_code=%s&grant_type=authorization_code' % (
    WXAPP_ID, WXAPP_SECRET, code)
    data = requests.get(url).json()
    print(data)
    return data


def get_user_info(encryptedData, iv, session_key):
    pc = WXBizDataCrypt(WXAPP_ID, session_key)
    return pc.decrypt(encryptedData, iv)

WXBizDataCrypt.py

import base64
import json
from Crypto.Cipher import AES


class WXBizDataCrypt:
    def __init__(self, appId, sessionKey):
        self.appId = appId
        self.sessionKey = sessionKey

    def decrypt(self, encryptedData, iv):
        # base64 decode
        sessionKey = base64.b64decode(self.sessionKey)
        encryptedData = base64.b64decode(encryptedData)
        iv = base64.b64decode(iv)

        cipher = AES.new(sessionKey, AES.MODE_CBC, iv)

        decrypted = json.loads(self._unpad(cipher.decrypt(encryptedData)))

        if decrypted['watermark']['appid'] != self.appId:
            raise Exception('Invalid Buffer')

        return decrypted

    def _unpad(self, s):
        return s[:-ord(s[len(s)-1:])]

Json Web Token(JWT)

JWT简介

JSON Web Token(JWT)是一个非常轻巧的规范。这个规范允许我们使用JWT在两个组织之间传递安全可靠的信息。

JWT是一个有着简单的统一表达形式的字符串:

头部(Header)

头部用于描述关于该JWT的最基本的信息,例如其类型以及签名所用的算法等。 JSON内容要经Base64 编码生成字符串成为Header。

载荷(PayLoad)

payload的五个字段都是由JWT的标准所定义的。

  • iss: 该JWT的签发者
  • sub: 该JWT所面向的用户
  • aud: 接收该JWT的一方
  • exp(expires): 什么时候过期,这里是一个Unix时间戳
  • iat(issued at): 在什么时候签发的

后面的信息可以按需补充。 JSON内容要经Base64 编码生成字符串成为PayLoad。

签名(signature)

这个部分header与payload通过header中声明的加密方式,使用密钥secret进行加密,生成签名。 JWS的主要目的是保证了数据在传输过程中不被修改,验证数据的完整性。但由于仅采用Base64对消息内容编码,因此不保证数据的不可泄露性。所以不适合用于传输敏感数据。

JWT的Python库

安装

pip install pyjwt

使用示例


>>> import jwt

>>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
>>> encoded_jwt
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'

>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
{'some': 'payload'}

jwt工具的封装

flask原生提供current_app, application 一旦启动,就可以通过current_app.config获取当前application的所有配置。

安装pyjwt模块,在lib目录下,封装jwt_utils.py封装工具代码:

import jwt
# 导入current_app,获取配置信息
from flask import current_app
# jwt工具的封装
# 步骤:
# 1.导入jwt模块
# 2.封装jwt生成的函数,必须要有密钥secret_key
# 3.封装jwt校验的函数


def generate_jwt(payload,expire,secret_key=None):
    # 参数:
    # payload表示存储的用户信息
    # expire表示jwt的过期时间
    # secret_key表示密钥
    _payload = {'exp':expire}
    _payload.update(payload)
    # 判断是否传入密钥
    if not secret_key:
        secret_key = current_app.config['SECRET_KEY']
    token = jwt.encode(_payload,secret_key,algorithm='HS256')
    return token.decode()
    pass


def verify_jwt(token,secret_key=None):
    # 参数:
    # token表示需要校验的jwt/token
    # secret_key表示密钥
    if not secret_key:
        secret_key = current_app.config.get('SECRET_KEY')
    try:
        payload = jwt.decode(token,secret_key,algorithms=['HS256'])
    except jwt.PyJWTError:
        payload = None

    return payload
    pass

在配置config.py下面配置密钥secret_key

# 封装配置的基类
class BaseConfig(object):
    SECRET_KEY = 'FA-LEUOAZDUFK34Lsfdzf-q340=34q234'
    JWT_EXPIRE_TIME = 24

用户登录

登录接口的实现步骤:

  • 获取参数code
  • 获取参数iv、envryptedData
  • 调用微信工具,获取session_key
  • 根据session_key,调用微信工具,获取用户信息
  • 判断是否获取到openID
  • 保存用户数据
    • 查询mysql数据库,判断openID是否存在
    • 如果openID不存在,保存用户信息
    • 否则,更新用户信息
  • 调用jwt工具,生成token
  • 返回数据
  • 封装工具,生成token的有效期
    • 有效期:24小时;
    • 当前时间:datetime获取当前时间,时间差操作timedelta

applet_app/user.py文件中实现业务。

代码实现

1、创建蓝图,第三方登录

# Flask基本程序实现
from flask import Blueprint,request,jsonify,current_app
# 导入日期模块
from datetime import datetime,timedelta

# 导入微信工具
from lib.wxauth import get_wxapp_session_key,get_user_info
# 导入模型类
from models.user import User
# 导入数据库sqlalchemy对象
from models import db
# 导入jwt工具
from lib.jwt_utils import generate_jwt


# 创建蓝图对象
user_bp = Blueprint('user_bp',__name__,url_prefix='/users')

def _generate_jwt_token(user_id):
    # 参数:user_id表示生成token的载荷中存储用户信息
    # 步骤:
    # 1、生成当前时间
    now = datetime.utcnow()
    # 2、根据时间差,指定token的过期时间,
    # expire = now + timedelta(hours=24)
    expiry = now + timedelta(hours=current_app.config.get("JWT_EXPIRE_TIME"))
    # 3、调用jwt工具,传入过期时间
    token = generate_jwt({'user_id':user_id},expire=expiry)
    # 4、返回token
    return token
    pass

@user_bp.route("/login",methods=['POST'])
def login():
    #- 1、获取参数code,用户登录凭证,有效期五分钟
    code = request.json.get('code','')
    #- 2、获取参数iv、envryptedData
    iv = request.json.get('iv','')
    envryptedData = request.json.get('envryptedData','')
    # 判断参数是否存在
    if not iv or not envryptedData or not code:
        return jsonify(msg='参数错误'),403
    #- 3、调用微信工具,获取session_key
    data = get_wxapp_session_key(code)
    if 'session_key' not in data:
        return jsonify(msg='获取session_key信息失败',data=data),500
    #- 4、根据session_key,调用微信工具,获取用户信息
    session_key = data['session_key']
    user_info = get_user_info(envryptedData,iv,session_key)
    #- 5、判断是否获取到openID
    if 'openId' not in user_info:
        return jsonify(msg='获取用户信息失败',user_info=user_info),403
    #- 6、保存用户数据
    #- 查询mysql数据库,判断openID是否存在
    openid = user_info['openId']
    # User.query.filter(User.openId==openid).first()
    user = User.query.filter_by(openId=openid).first()
    if not user:
        user = User(user_info)
        db.session.add(user)
        # flush表示把当前的模型类对象,刷到数据库中
        db.session.flush()
    #- 如果用户存在,更新用户信息
    else:
        user.update_info(user_info)
        db.session.commit()
    # - 7、调用jwt工具,生成token
    token = _generate_jwt_token(user.id)
    # - 8、返回数据
    ret_data = {
        'token':token,
        'user_info':{
            'uid':user.id,
            'gender':user.gender,
            'avatarUrl':user.avatarUrl
        },
        "config": {
            "preference": user.preference,
            "brightness": user.brightness,
            "fontSize": user.fontSize,
            "background": user.background,
            "turn": user.turn
        }
    }
    return jsonify(ret_data)
    pass

2、为了后续项目功能演示,可以添加测试用户,用来测试功能。

定义视图,添加测试用户

@user_bp.route('/tmp_add_user', methods=['POST'])
def tmp_add_user():
    """
    添加测试用户
    :return:
    """
    data = dict(
        openId='1'*32,
        nickName='测试用户002',
        gender=1,
        city='广州市',
        province='广东省',
        country='中国',
        avatarUrl='default'
    )
    user = User(data)
    db.session.add(user)
    db.session.commit()
    # 后面完成
    # _add_book_shelf(user.id, user.gender)

    ret_data = {
        'msg': '添加成功',
        'user_id': user.id,
    }
    return jsonify(ret_data)

登录接口测试代码:

@user_bp.route('/tmp_login')
def tmp_login():
    """
    登录测试接口
    :return:
    """
    user_id = request.args.get('user_id')
    user = User.query.get(user_id)

    # 生成jwt token
    token = _generate_jwt_token(user_id)

    ret_data = {
        "token": token,
        "userInfo": {
            "uid": user.id,
            "gender": user.gender,
            "avatarUrl": user.avatarUrl
        },
        "config": {
            "preference": user.preference,
            "brightness": user.brightness,
            "fontSize": user.fontSize,
            "background": user.background,
            "turn": user.turn
        }
    }
    return jsonify(ret_data)

使用postman对接口进行测试:

查看MySQL数据库,确认用户添加成功:

用户认证

校验用户

用户登录完成后,在项目的后续接口中,有些需要用户登录才能访问的接口,所以,需要校验用户身份信息。

我们需要在每次请求前,从请求的JWTtoken的payload中取出用户id,用来校验用户权限,可以通过请求钩子实现,在项目目录/lib/middlewares.py文件,具体见代码:

实现步骤:

  • 封装工具,/lib/middlewrares.py
  • 定义函数,获取用户头信息,Authorization
  • 从payload中提取用户id,把用户id赋值给g对象
from flask import request,g
from .jwt_utils import verify_jwt

# 请求钩子:
# - 1.封装工具,/lib/middlewrares.py
# - 2.定义函数,获取用户头信息,Authorization
# - 3.使用jwt工具,校验token,从payload中提取用户id,把用户id赋值给g对象
# @app.before_request
# app.before_request(before_request)

def before_request():
    auth = request.headers.get('Authorization')
    if auth:
        payload = verify_jwt(token=auth)
        if payload:
            g.user_id = payload.get("user_id")
        pass

在applet_app文件夹下的__init__.py导入钩子

def create_applet_app(config_name=None):
    # 导入请求钩子,用户的权限校验
    from lib.middlewrares import before_request
    # 相当于@app.before_request
    app.before_request(before_request)

登录验证装饰器

取出用户信息后,我们可以通过装饰器的形式实现用户权限校验,在后续接口中,如果需要用户权限校验,直接添加login_required装饰器即可。

实现步骤:

  • 封装工具,/lib/decoraters.py
  • 定义装饰器
  • 判断用户id是否存在,从g对象中尝试获取用户id
  • 返回结果

具体见代码:

from flask import g,jsonify
import functools
# 1、封装工具,/lib/decoraters.py
# 2、定义装饰器
# 3、判断用户id是否存在,从g对象中尝试获取用户id
# 4、返回结果

def login_required(func):

    # 作用:让被装饰器的装饰的函数的属性(函数名)不发生变化。
    @functools.wraps(func)
    def wrapper(*args,**kwargs):
        if not g.user_id:
            return jsonify(msg='token error'),401
        return func(*args,**kwargs)
    # wrapper.__name__ = func.__name__
    return wrapper

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!