import uuid
import warnings
import jwt
from calendar import timegm
from datetime import datetime
from django.contrib.auth import get_user_model
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _
from rest_framework import exceptions
from rest_framework.authentication import (
BaseAuthentication, get_authorization_header
)
from rest_framework_jwt.settings import api_settings
# todo 导入我们自己的user表
from app.models import WxUsers
WxUsers = WxUsers
# todo 设置我们自己表的唯一表示字段
USERNAME = 'username'
class WXJSONWebTokenAuthentication(BaseAuthentication):
www_authenticate_realm = 'api'
def get_jwt_value(self, request):
"""获取token"""
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
if smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
def authenticate_header(self, request):
return '{0} realm="{1}"'.format(api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm)
def authenticate(self, request):
"""获取token 解密"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
def authenticate_credentials(self, payload):
"""返回用戶实例"""
username = payload.get(USERNAME)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
# todo 此处获取我们的用户
user = WxUsers.objects.get(username=username)
except Exception:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
return user
# todo 传入user表
def jwt_payload_handler(user):
# todo 设置用户唯一表示 django默认username 设置我们自己的 USERNAME
# username_field = get_username_field()
username_field = USERNAME
# todo 获取用户的信息 传入user 实体
username = get_username(user)
warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
)
payload = {
'user_id': user.pk,
'username': username,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
}
if hasattr(user, 'email'):
payload['email'] = user.email
if isinstance(user.pk, uuid.UUID):
payload['user_id'] = str(user.pk)
payload[username_field] = username
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.utcnow().utctimetuple()
)
if api_settings.JWT_AUDIENCE is not None:
payload['aud'] = api_settings.JWT_AUDIENCE
if api_settings.JWT_ISSUER is not None:
payload['iss'] = api_settings.JWT_ISSUER
return payload
def get_username_field():
try:
username_field = get_user_model().USERNAME_FIELD
except:
username_field = 'username'
return username_field
def get_username(user):
try:
username = user.get_username()
except AttributeError:
username = user.username
return username
# todo 设置自己的 JWT_PRIVATE_KEY 加盐
from url_deml.settings import JWT_PRIVATE_KEY
# todo 加密成token
def jwt_encode_handler(payload):
key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload)
return jwt.encode(
payload,
JWT_PRIVATE_KEY,
api_settings.JWT_ALGORITHM
).decode('utf-8')
def jwt_get_secret_key(payload=None):
"""
For enhanced security you may want to use a secret key based on user.
This way you have an option to logout only this user if:
- token is compromised
- password is changed
- etc.
"""
if api_settings.JWT_GET_USER_SECRET_KEY:
User = get_user_model() # noqa: N806
user = User.objects.get(pk=payload.get('user_id'))
key = str(api_settings.JWT_GET_USER_SECRET_KEY(user))
return key
return api_settings.JWT_SECRET_KEY
# todo token 反解
def jwt_decode_handler(token):
payload = jwt.decode(token, JWT_PRIVATE_KEY, algorithms=[api_settings.JWT_ALGORITHM])
return payload
# def jwt_decode_handler(token):
# options = {
# 'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
# }
# # get user from token, BEFORE verification, to get user secret key
# unverified_payload = jwt.decode(token, None, False)
# secret_key = jwt_get_secret_key(unverified_payload)
# return jwt.decode(
# token,
# api_settings.JWT_PUBLIC_KEY or secret_key,
# api_settings.JWT_VERIFY,
# options=options,
# leeway=api_settings.JWT_LEEWAY,
# audience=api_settings.JWT_AUDIENCE,
# issuer=api_settings.JWT_ISSUER,
# algorithms=[api_settings.JWT_ALGORITHM]
# )
|