Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
payload[k] = v
# add chain of trust
x5cfile = join(statedir, "x5c")
errMsg = "Error opening/processing x5c file"
if exists(x5cfile):
try:
with open(x5cfile) as x:
# serialize the given x5c as json Sring[]
x5c = x.read().strip()[1:-1].replace('"', '').split(',')
cc = get_cert_claims(x5c)
payload = check_payload(payload, cc)
if payload is None:
errMsg = "Payload claims do not match chain of trust"
raise Exception(errMsg)
token = jwt.JWT(header={"alg": "RS256", "x5c":x5c, "typ": "JWT", "kid": key.key_id},
claims=payload)
token.make_signed_token(key)
return token.serialize()
except Exception as e:
# using without x5c chain of trust should be disabled
print e
raise e
raise Exception("System not initialized. Missing x5c file. Abort!")
statedir = os.getenv('STATEDIR') or '/tmp'
# add chain of trust
x5cfile = join(statedir, "x5c")
errMsg = "Error opening/processing x5c file"
if exists(x5cfile):
try:
with open(x5cfile) as x:
# serialize the given x5c as json Sring[]
x5c = x.read().strip()[1:-1].replace('"', '').split(',')
cc = get_cert_claims(x5c)
payload = check_payload(payload, cc)
if payload is None:
errMsg = "Payload claims do not match chain of trust"
raise Exception(errMsg)
token = jwt.JWT(header={"alg": "RS256", "x5c":x5c, "typ": "JWT", "kid": key.key_id},
claims=payload)
token.make_signed_token(key)
return token.serialize()
except Exception as e:
# using without x5c chain of trust should be disabled
print e
raise e
token = jwt.JWT(header={"alg": "RS256", "typ": "JWT", "kid": key.key_id},claims=payload)
token.make_signed_token(key)
return token.serialize()
payload["aud"] = args.aud.split(",")
else:
payload["aud"] = args.aud
if args.claims:
# we are using "|" to separate claims,
# because `images` contain "," to seperate values
# strip last `|` if any to remove empty claims
for item in args.claims.rstrip('|').split("|"):
# strip out all the doublequotes
item = item.replace('"','')
s = item.split(':')
k = s[0]
v = ':'.join(s[1:])
payload[k] = v
token = jwt.JWT(header={"alg": "RS256", "typ": "JWT", "kid": key.key_id},
claims=payload)
token.make_signed_token(key)
return token.serialize()
def encode_jwt(claims, kid, secret_store, purpose):
private_jwk = secret_store.get_private_key_by_kid(purpose, kid).as_jwk()
header = {
'kid': kid,
'typ': 'jwt',
'alg': 'RS256',
}
token = jwt.JWT(claims=claims, header=header)
token.make_signed_token(private_jwk)
return token.serialize()
"grant_type": "password",
"nonce": nonce(),
"password": password,
"scope": "onstar gmoc commerce msso",
"timestamp": timestamp(),
"username": username
}
token_auth = jwt.JWT(header={"alg": "HS256", "typ": "JWT"}, claims=data_auth)
token_auth.make_signed_token(signing_key)
token_auth_encoded = token_auth.serialize()
print "REQUEST_AUTH %s" % (token_auth_encoded)
response_auth = requests.post('https://api.gm.com/api/v1/oauth/token', headers=headers_auth, data=token_auth_encoded)
print "RESPONSE_AUTH %d: %s" % (response_auth.status_code, response_auth.text)
response_auth_jwt = jwt.JWT(key=signing_key, jwt=response_auth.text)
response_auth_json = json.loads(response_auth_jwt.claims)
oauth_token = response_auth_json["access_token"]
headers_connect = {
'Accept': 'application/json',
'Authorization': 'Bearer %s' % (oauth_token),
'Accept-Language': 'en',
'Content-Type': 'application/json; charset=UTF-8',
'Host': 'api.gm.com',
'Connection': 'close',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'okhttp/3.9.0',
}
data_connect = '{}'
print "REQUEST_CONNECT!"
def decode_jwt(jwt_token, secret_store, purpose, leeway=None):
try:
jwt_kid = extract_kid_from_header(jwt_token)
logger.info("Decoding JWT", kid=jwt_kid)
public_jwk = secret_store.get_public_key_by_kid(purpose, jwt_kid).as_jwk()
check_claims = {
"jti": None,
"exp": None,
"iat": None,
}
signed_token = jwt.JWT(algs=['RS256'], check_claims=check_claims)
if leeway:
signed_token.leeway = leeway
signed_token.deserialize(jwt_token, key=public_jwk)
return json.loads(signed_token.claims)
except (InvalidJWSObject,
InvalidJWSSignature,
JWTInvalidClaimFormat,
JWTMissingClaim,
JWTExpired,
ValueError) as e:
raise InvalidTokenException(repr(e))
def main(args):
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
f.closed
key = jwk.JWK.from_pem(pem_data)
else:
raise Exception('Unhandled key type: %s' % args.key)
with open(args.jwt) as f:
raw_jwt = f.read()
token = jwt.JWT()
token.deserialize(raw_jwt, key)
def main(args):
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
f.closed
key = jwk.JWK.from_pem(pem_data)
else:
key = jwk.JWK.from_uri(args.key)
with open(args.jwt) as f:
raw_jwt = f.read()
f.closed
token = jwt.JWT()
token.deserialize(raw_jwt, key)
def make_token(kid: str, software_statement_id: str, client_scopes: str, token_url: str) -> str:
jwt_iat = int(time.time())
jwt_exp = jwt_iat + 3600
header = dict(alg='RS256', kid=kid, typ='JWT')
claims = dict(
iss=software_statement_id,
sub=software_statement_id,
scopes=client_scopes,
aud=token_url,
jti=str(uuid.uuid4()),
iat=jwt_iat,
exp=jwt_exp
)
token = jwt.JWT(header=header, claims=claims)
key_obj = jwk.JWK.from_pem(cache.get('private_key_pem').encode('latin-1'))
token.make_signed_token(key_obj)
signed_token = token.serialize()
return signed_token