django rest framework 定义自己的登录微信验证类 自定义user表

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 20:08   1974   0
import uuid
import warnings
import jwt
from calendar import timegm
from datetime import datetime

from django.contrib.auth import get_user_model
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _
from rest_framework import exceptions
from rest_framework.authentication import (
    BaseAuthentication, get_authorization_header
)

from rest_framework_jwt.settings import api_settings
# todo 导入我们自己的user表
from app.models import WxUsers

WxUsers = WxUsers
# todo 设置我们自己表的唯一表示字段
USERNAME = 'username'


class WXJSONWebTokenAuthentication(BaseAuthentication):
    www_authenticate_realm = 'api'

    def get_jwt_value(self, request):
        """获取token"""
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

        if not auth:
            if api_settings.JWT_AUTH_COOKIE:
                return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
            return None

        if smart_text(auth[0].lower()) != auth_header_prefix:
            return None

        if len(auth) == 1:
            msg = _('Invalid Authorization header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid Authorization header. Credentials string '
                    'should not contain spaces.')
            raise exceptions.AuthenticationFailed(msg)

        return auth[1]

    def authenticate_header(self, request):

        return '{0} realm="{1}"'.format(api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm)

    def authenticate(self, request):
        """获取token 解密"""
        jwt_value = self.get_jwt_value(request)
        if jwt_value is None:
            return None

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        user = self.authenticate_credentials(payload)

        return (user, jwt_value)

    def authenticate_credentials(self, payload):
        """返回用戶实例"""
        username = payload.get(USERNAME)

        if not username:
            msg = _('Invalid payload.')
            raise exceptions.AuthenticationFailed(msg)

        try:
            # todo 此处获取我们的用户
            user = WxUsers.objects.get(username=username)
        except Exception:
            msg = _('Invalid signature.')
            raise exceptions.AuthenticationFailed(msg)

        return user


# todo 传入user表
def jwt_payload_handler(user):
    # todo 设置用户唯一表示 django默认username  设置我们自己的 USERNAME
    # username_field = get_username_field()
    username_field = USERNAME
    # todo 获取用户的信息 传入user 实体
    username = get_username(user)

    warnings.warn(
        'The following fields will be removed in the future: '
        '`email` and `user_id`. ',
        DeprecationWarning
    )

    payload = {
        'user_id': user.pk,
        'username': username,
        'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
    }
    if hasattr(user, 'email'):
        payload['email'] = user.email
    if isinstance(user.pk, uuid.UUID):
        payload['user_id'] = str(user.pk)

    payload[username_field] = username

    # Include original issued at time for a brand new token,
    # to allow token refresh
    if api_settings.JWT_ALLOW_REFRESH:
        payload['orig_iat'] = timegm(
            datetime.utcnow().utctimetuple()
        )

    if api_settings.JWT_AUDIENCE is not None:
        payload['aud'] = api_settings.JWT_AUDIENCE

    if api_settings.JWT_ISSUER is not None:
        payload['iss'] = api_settings.JWT_ISSUER

    return payload


def get_username_field():
    try:
        username_field = get_user_model().USERNAME_FIELD
    except:
        username_field = 'username'

    return username_field


def get_username(user):
    try:
        username = user.get_username()
    except AttributeError:
        username = user.username

    return username


# todo 设置自己的 JWT_PRIVATE_KEY  加盐
from url_deml.settings import JWT_PRIVATE_KEY


# todo 加密成token
def jwt_encode_handler(payload):
    key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload)
    return jwt.encode(
        payload,
        JWT_PRIVATE_KEY,
        api_settings.JWT_ALGORITHM
    ).decode('utf-8')


def jwt_get_secret_key(payload=None):
    """
    For enhanced security you may want to use a secret key based on user.

    This way you have an option to logout only this user if:
        - token is compromised
        - password is changed
        - etc.
    """
    if api_settings.JWT_GET_USER_SECRET_KEY:
        User = get_user_model()  # noqa: N806
        user = User.objects.get(pk=payload.get('user_id'))
        key = str(api_settings.JWT_GET_USER_SECRET_KEY(user))
        return key
    return api_settings.JWT_SECRET_KEY


# todo token 反解
def jwt_decode_handler(token):
    payload = jwt.decode(token, JWT_PRIVATE_KEY, algorithms=[api_settings.JWT_ALGORITHM])
    return payload

# def jwt_decode_handler(token):
#     options = {
#         'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
#     }
#     # get user from token, BEFORE verification, to get user secret key
#     unverified_payload = jwt.decode(token, None, False)
#     secret_key = jwt_get_secret_key(unverified_payload)
#     return jwt.decode(
#         token,
#         api_settings.JWT_PUBLIC_KEY or secret_key,
#         api_settings.JWT_VERIFY,
#         options=options,
#         leeway=api_settings.JWT_LEEWAY,
#         audience=api_settings.JWT_AUDIENCE,
#         issuer=api_settings.JWT_ISSUER,
#         algorithms=[api_settings.JWT_ALGORITHM]
#     )

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP