mirror of
https://github.com/RaidMax/IW4M-Admin.git
synced 2025-06-22 05:00:52 -05:00
Add server version to master api
Add IsEvadedOffense to EFPenalty Fix remote log reading in not Windows
This commit is contained in:
139
Master/env_master/Lib/site-packages/flask_jwt_extended/tokens.py
Normal file
139
Master/env_master/Lib/site-packages/flask_jwt_extended/tokens.py
Normal file
@ -0,0 +1,139 @@
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from calendar import timegm
|
||||
|
||||
import jwt
|
||||
from werkzeug.security import safe_str_cmp
|
||||
|
||||
from flask_jwt_extended.exceptions import JWTDecodeError, CSRFError
|
||||
|
||||
|
||||
def _create_csrf_token():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def _encode_jwt(additional_token_data, expires_delta, secret, algorithm,
|
||||
json_encoder=None):
|
||||
uid = str(uuid.uuid4())
|
||||
now = datetime.datetime.utcnow()
|
||||
token_data = {
|
||||
'iat': now,
|
||||
'nbf': now,
|
||||
'jti': uid,
|
||||
}
|
||||
# If expires_delta is False, the JWT should never expire
|
||||
# and the 'exp' claim is not set.
|
||||
if expires_delta:
|
||||
token_data['exp'] = now + expires_delta
|
||||
token_data.update(additional_token_data)
|
||||
encoded_token = jwt.encode(token_data, secret, algorithm,
|
||||
json_encoder=json_encoder).decode('utf-8')
|
||||
return encoded_token
|
||||
|
||||
|
||||
def encode_access_token(identity, secret, algorithm, expires_delta, fresh,
|
||||
user_claims, csrf, identity_claim_key, user_claims_key,
|
||||
json_encoder=None):
|
||||
"""
|
||||
Creates a new encoded (utf-8) access token.
|
||||
|
||||
:param identity: Identifier for who this token is for (ex, username). This
|
||||
data must be json serializable
|
||||
:param secret: Secret key to encode the JWT with
|
||||
:param algorithm: Which algorithm to encode this JWT with
|
||||
:param expires_delta: How far in the future this token should expire
|
||||
(set to False to disable expiration)
|
||||
:type expires_delta: datetime.timedelta or False
|
||||
:param fresh: If this should be a 'fresh' token or not. If a
|
||||
datetime.timedelta is given this will indicate how long this
|
||||
token will remain fresh.
|
||||
:param user_claims: Custom claims to include in this token. This data must
|
||||
be json serializable
|
||||
:param csrf: Whether to include a csrf double submit claim in this token
|
||||
(boolean)
|
||||
:param identity_claim_key: Which key should be used to store the identity
|
||||
:param user_claims_key: Which key should be used to store the user claims
|
||||
:return: Encoded access token
|
||||
"""
|
||||
|
||||
if isinstance(fresh, datetime.timedelta):
|
||||
now = datetime.datetime.utcnow()
|
||||
fresh = timegm((now + fresh).utctimetuple())
|
||||
|
||||
token_data = {
|
||||
identity_claim_key: identity,
|
||||
'fresh': fresh,
|
||||
'type': 'access',
|
||||
}
|
||||
|
||||
# Don't add extra data to the token if user_claims is empty.
|
||||
if user_claims:
|
||||
token_data[user_claims_key] = user_claims
|
||||
|
||||
if csrf:
|
||||
token_data['csrf'] = _create_csrf_token()
|
||||
return _encode_jwt(token_data, expires_delta, secret, algorithm,
|
||||
json_encoder=json_encoder)
|
||||
|
||||
|
||||
def encode_refresh_token(identity, secret, algorithm, expires_delta, csrf,
|
||||
identity_claim_key, json_encoder=None):
|
||||
"""
|
||||
Creates a new encoded (utf-8) refresh token.
|
||||
|
||||
:param identity: Some identifier used to identify the owner of this token
|
||||
:param secret: Secret key to encode the JWT with
|
||||
:param algorithm: Which algorithm to use for the toek
|
||||
:param expires_delta: How far in the future this token should expire
|
||||
(set to False to disable expiration)
|
||||
:type expires_delta: datetime.timedelta or False
|
||||
:param csrf: Whether to include a csrf double submit claim in this token
|
||||
(boolean)
|
||||
:param identity_claim_key: Which key should be used to store the identity
|
||||
:return: Encoded refresh token
|
||||
"""
|
||||
token_data = {
|
||||
identity_claim_key: identity,
|
||||
'type': 'refresh',
|
||||
}
|
||||
if csrf:
|
||||
token_data['csrf'] = _create_csrf_token()
|
||||
return _encode_jwt(token_data, expires_delta, secret, algorithm,
|
||||
json_encoder=json_encoder)
|
||||
|
||||
|
||||
def decode_jwt(encoded_token, secret, algorithm, identity_claim_key,
|
||||
user_claims_key, csrf_value=None):
|
||||
"""
|
||||
Decodes an encoded JWT
|
||||
|
||||
:param encoded_token: The encoded JWT string to decode
|
||||
:param secret: Secret key used to encode the JWT
|
||||
:param algorithm: Algorithm used to encode the JWT
|
||||
:param identity_claim_key: expected key that contains the identity
|
||||
:param user_claims_key: expected key that contains the user claims
|
||||
:param csrf_value: Expected double submit csrf value
|
||||
:return: Dictionary containing contents of the JWT
|
||||
"""
|
||||
# This call verifies the ext, iat, and nbf claims
|
||||
data = jwt.decode(encoded_token, secret, algorithms=[algorithm])
|
||||
|
||||
# Make sure that any custom claims we expect in the token are present
|
||||
if 'jti' not in data:
|
||||
raise JWTDecodeError("Missing claim: jti")
|
||||
if identity_claim_key not in data:
|
||||
raise JWTDecodeError("Missing claim: {}".format(identity_claim_key))
|
||||
if 'type' not in data or data['type'] not in ('refresh', 'access'):
|
||||
raise JWTDecodeError("Missing or invalid claim: type")
|
||||
if data['type'] == 'access':
|
||||
if 'fresh' not in data:
|
||||
raise JWTDecodeError("Missing claim: fresh")
|
||||
if user_claims_key not in data:
|
||||
data[user_claims_key] = {}
|
||||
if csrf_value:
|
||||
if 'csrf' not in data:
|
||||
raise JWTDecodeError("Missing claim: csrf")
|
||||
if not safe_str_cmp(data['csrf'], csrf_value):
|
||||
raise CSRFError("CSRF double submit tokens do not match")
|
||||
return data
|
Reference in New Issue
Block a user