Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
host_spec_path = path.join(ocsp.OCSP_CACHE.CACHE_DIR,"host_spec_bypass_ssd.ssd")
with open(host_spec_path, "w") as jwt_host_spec_fp:
tdelta = timedelta(days=1)
nbf_val = datetime.utcnow()
exp_val = nbf_val+tdelta
header = {'ssd_iss':'dep1'}
payload = {}
hname_string = " ".join(hostname)
acc_name = ocsp.get_account_from_hostname(hostname[0])
payload.update({'sfcEndpoint': hname_string})
payload.update({'certId': '*'})
payload.update({'nbf': nbf_val})
payload.update({'exp': exp_val})
host_spec_jwt_token = jwt.encode(payload, priv_key, algorithm='RS512', headers=header)
host_spec_bypass_ssd = {acc_name: host_spec_jwt_token.decode("utf-8")}
json.dump(host_spec_bypass_ssd, jwt_host_spec_fp)
with self.app.test_request_context():
token = encode_access_token('foo', 'newsecret', 'HS256',
timedelta(minutes=5), True, {},
csrf=False, identity_claim='identity')
auth_header = "Bearer {}".format(token)
response = self.client.get('/partially-protected',
headers={'Authorization': auth_header})
data = json.loads(response.get_data(as_text=True))
status_code = response.status_code
self.assertEqual(status_code, 422)
self.assertIn('msg', data)
# Test with valid token that is missing required claims
now = datetime.utcnow()
token_data = {'exp': now + timedelta(minutes=5)}
encoded_token = jwt.encode(token_data, self.app.config['SECRET_KEY'],
self.app.config['JWT_ALGORITHM']).decode('utf-8')
auth_header = "Bearer {}".format(encoded_token)
response = self.client.get('/partially-protected',
headers={'Authorization': auth_header})
data = json.loads(response.get_data(as_text=True))
status_code = response.status_code
self.assertEqual(status_code, 422)
self.assertIn('msg', data)
Returns:
JSON document.
"""
if reg_key != server_reg_key:
response.status = hug.HTTP_403
return
new_user = User(name=name, gender=gender, birth_date=datetime.strptime(birth_date, "%Y-%m-%d").date())
db_session.add(new_user)
db_session.commit()
data = {}
data['id'] = new_user.id
data['token'] = jwt.encode({'id': new_user.id, 'name': name, 'gender': gender, 'birth_date': birth_date}, Config.SUPER_SECRET_KEY, algorithm='HS256').decode('ascii')
return json.dumps(data, indent=4)
# - https://addons-server.readthedocs.io/en/latest/topics/api/auth.html
# - https://addons-server.readthedocs.io/en/latest/topics/api/signing.html
#
print('Ask AMO to sign self-hosted xpi package...')
with open(unsigned_xpi_filepath, 'rb') as f:
amo_api_key = input_secret('AMO API key', 'amo_api_key')
amo_secret = input_secret('AMO API secret', 'amo_secret')
amo_nonce = os.urandom(8).hex()
jwt_payload = {
'iss': amo_api_key,
'jti': amo_nonce,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=180),
}
jwt_auth = 'JWT ' + jwt.encode(jwt_payload, amo_secret).decode()
headers = { 'Authorization': jwt_auth, }
data = { 'channel': 'unlisted' }
files = { 'upload': f, }
signing_url = 'https://addons.mozilla.org/api/v3/addons/{0}/versions/{1}/'.format(extension_id, ext_version)
print('Submitting package to be signed...')
response = requests.put(signing_url, headers=headers, data=data, files=files)
if response.status_code != 202:
print('Error: Creating new version failed -- server error {0}'.format(response.status_code))
print(response.text)
exit(1)
print('Request for signing self-hosted xpi package succeeded.')
signing_request_response = response.json();
f.close()
print('Waiting for AMO to process the request to sign the self-hosted xpi package...')
# Wait for signed package to be ready
signing_check_url = signing_request_response['url']
def create_token(self, user, expires_at=None):
if not expires_at:
exp = (as_timestamp(datetime_or_now())
+ self.request.session.get_expiry_age())
else:
exp = as_timestamp(expires_at)
payload = UserSerializer().to_representation(user)
payload.update({'exp': exp})
token = jwt.encode(payload, settings.JWT_SECRET_KEY,
settings.JWT_ALGORITHM).decode('utf-8')
LOGGER.info("%s signed in.", user,
extra={'event': 'login', 'request': self.request})
return Response(TokenSerializer().to_representation({'token': token}),
status=status.HTTP_201_CREATED)
def encode_auth_token(self, user_id):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': dt.datetime.utcnow() + dt.timedelta(days=0, seconds=5),
'iat': dt.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
current_app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def encode_jwt(payload, node):
"""Encode JSON Web Token"""
# _log.debug("encode_jwt:\n\tpayload={}".format(payload))
if node.runtime_credentials:
private_key = node.runtime_credentials.get_private_key()
else:
_log.error("Node has no runtime credentials, cannot sign JWT")
# Create a JSON Web Token signed using the node's Elliptic Curve private key.
return jwt.encode(payload, private_key, algorithm='ES256')
def pwd_reset_token():
data = request.get_json()['data']['attributes']
if 'email' not in data.keys():
print('Email not found')
email = data['email']
user = User.getUser(email=email)
if not user:
return ErrorResponse(UserNotFound().message, 422, {'Content-Type': 'application/json'}).respond()
expire = datetime.datetime.utcnow() + datetime.timedelta(hours=24)
token = jwt.encode({
'id': user.id,
'exp': expire
}, app.config.get('SECRET_KEY'))
try:
resetObj = ResetPasswordToken.query.get(user.id)
resetObj.token = token
except ProgrammingError:
resetObj = ResetPasswordToken(user.id, token.decode('UTF-8'))
resetObj.save_to_db()
return jsonify(TokenSchema().dump(resetObj).data)
def generate_signed_token(private_pem, request):
import Crypto.PublicKey.RSA as RSA
import jwt
private_key = RSA.importKey(private_pem)
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_key, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def login():
data = request.get_json()
user = User.authenticate(**data)
if not user:
return jsonify({ 'message': 'Invalid credentials', 'authenticated': False }), 401
token = jwt.encode({
'sub': user.email,
'iat':datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(minutes=30)},
current_app.config['SECRET_KEY'])
return jsonify({ 'token': token.decode('UTF-8') })