Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# XXX forbid duplicate keys in JSON input using object_pairs_hook (2.7+)
recipients = jwsjs["recipients"]
encoded_payload = binary(jwsjs["payload"])
headers = []
for recipient in recipients:
assertTrue(len(recipient) == 2, "Unknown recipient key {0}".format(recipient))
h = binary(recipient["header"])
s = binary(recipient["signature"])
header = json.loads(native(urlsafe_b64decode(h)))
assertTrue(header["alg"] == ALG,
"Unexpected algorithm {0}".format(header["alg"]))
if "alg" in header["jwk"] and "kty" not in header["jwk"]:
header["jwk"]["kty"] = header["jwk"]["alg"] # b/w for JWK < -08
assertTrue(header["jwk"]["kty"] == ALG, # true for Ed25519
"Unexpected key type {0}".format(header["jwk"]["kty"]))
vk = urlsafe_b64decode(binary(header["jwk"]["vk"]))
secured_input = b".".join((h, encoded_payload))
sig = urlsafe_b64decode(s)
sig_msg = sig+secured_input
verified_input = native(ed25519ll.crypto_sign_open(sig_msg, vk))
verified_header, verified_payload = verified_input.split('.')
verified_header = binary(verified_header)
decoded_header = native(urlsafe_b64decode(verified_header))
headers.append(json.loads(decoded_header))
verified_payload = binary(verified_payload)
# only return header, payload that have passed through the crypto library.
payload = json.loads(native(urlsafe_b64decode(verified_payload)))
return headers, payload
raise BadWheelFile(msg.format(payload['hash'],
native(record_digest)))
reader = csv.reader((native(r, 'utf-8') for r in record.splitlines()))
for row in reader:
filename = row[0]
hash = row[1]
if not hash:
if filename not in (record_name, sig_name):
print("%s has no hash!" % filename, file=sys.stderr)
continue
algo, data = row[1].split('=', 1)
assert algo == "sha256", "Unsupported hash algorithm"
zipfile.set_expected_hash(filename, urlsafe_b64decode(binary(data)))
s = binary(recipient["signature"])
header = json.loads(native(urlsafe_b64decode(h)))
assertTrue(header["alg"] == ALG,
"Unexpected algorithm {0}".format(header["alg"]))
if "alg" in header["jwk"] and "kty" not in header["jwk"]:
header["jwk"]["kty"] = header["jwk"]["alg"] # b/w for JWK < -08
assertTrue(header["jwk"]["kty"] == ALG, # true for Ed25519
"Unexpected key type {0}".format(header["jwk"]["kty"]))
vk = urlsafe_b64decode(binary(header["jwk"]["vk"]))
secured_input = b".".join((h, encoded_payload))
sig = urlsafe_b64decode(s)
sig_msg = sig+secured_input
verified_input = native(ed25519ll.crypto_sign_open(sig_msg, vk))
verified_header, verified_payload = verified_input.split('.')
verified_header = binary(verified_header)
decoded_header = native(urlsafe_b64decode(verified_header))
headers.append(json.loads(decoded_header))
verified_payload = binary(verified_payload)
# only return header, payload that have passed through the crypto library.
payload = json.loads(native(urlsafe_b64decode(verified_payload)))
return headers, payload
line = line.decode('utf-8')
path, hash_sum, size = line.rsplit(u',', 2)
if hash_sum:
algorithm, hash_sum = hash_sum.split(u'=')
try:
hashlib.new(algorithm)
except ValueError:
raise WheelError('Unsupported hash algorithm: {}'.format(algorithm))
if algorithm.lower() in {'md5', 'sha1'}:
raise WheelError(
'Weak hash algorithm ({}) is not permitted by PEP 427'
.format(algorithm))
self._file_hashes[path] = (
algorithm, urlsafe_b64decode(hash_sum.encode('ascii')))
line = line.decode('utf-8')
path, hash_sum, size = line.rsplit(u',', 2)
if hash_sum:
algorithm, hash_sum = hash_sum.split(u'=')
try:
hashlib.new(algorithm)
except ValueError:
raise WheelError('Unsupported hash algorithm: {}'.format(algorithm))
if algorithm.lower() in {'md5', 'sha1'}:
raise WheelError(
'Weak hash algorithm ({}) is not permitted by PEP 427'
.format(algorithm))
self._file_hashes[path] = (
algorithm, urlsafe_b64decode(hash_sum.encode('ascii')))
assertTrue(header["jwk"]["kty"] == ALG, # true for Ed25519
"Unexpected key type {0}".format(header["jwk"]["kty"]))
vk = urlsafe_b64decode(binary(header["jwk"]["vk"]))
secured_input = b".".join((h, encoded_payload))
sig = urlsafe_b64decode(s)
sig_msg = sig+secured_input
verified_input = native(ed25519ll.crypto_sign_open(sig_msg, vk))
verified_header, verified_payload = verified_input.split('.')
verified_header = binary(verified_header)
decoded_header = native(urlsafe_b64decode(verified_header))
headers.append(json.loads(decoded_header))
verified_payload = binary(verified_payload)
# only return header, payload that have passed through the crypto library.
payload = json.loads(native(urlsafe_b64decode(verified_payload)))
return headers, payload
line = line.decode('utf-8')
path, hash_sum, size = line.rsplit(u',', 2)
if hash_sum:
algorithm, hash_sum = hash_sum.split(u'=')
try:
hashlib.new(algorithm)
except ValueError:
raise WheelError('Unsupported hash algorithm: {}'.format(algorithm))
if algorithm.lower() in {'md5', 'sha1'}:
raise WheelError(
'Weak hash algorithm ({}) is not permitted by PEP 427'
.format(algorithm))
self._file_hashes[path] = (
algorithm, urlsafe_b64decode(hash_sum.encode('ascii')))
def verify(jwsjs):
"""Return (decoded headers, payload) if all signatures in jwsjs are
consistent, else raise ValueError.
Caller must decide whether the keys are actually trusted."""
get_ed25519ll()
# XXX forbid duplicate keys in JSON input using object_pairs_hook (2.7+)
recipients = jwsjs["recipients"]
encoded_payload = binary(jwsjs["payload"])
headers = []
for recipient in recipients:
assertTrue(len(recipient) == 2, "Unknown recipient key {0}".format(recipient))
h = binary(recipient["header"])
s = binary(recipient["signature"])
header = json.loads(native(urlsafe_b64decode(h)))
assertTrue(header["alg"] == ALG,
"Unexpected algorithm {0}".format(header["alg"]))
if "alg" in header["jwk"] and "kty" not in header["jwk"]:
header["jwk"]["kty"] = header["jwk"]["alg"] # b/w for JWK < -08
assertTrue(header["jwk"]["kty"] == ALG, # true for Ed25519
"Unexpected key type {0}".format(header["jwk"]["kty"]))
vk = urlsafe_b64decode(binary(header["jwk"]["vk"]))
secured_input = b".".join((h, encoded_payload))
sig = urlsafe_b64decode(s)
sig_msg = sig+secured_input
verified_input = native(ed25519ll.crypto_sign_open(sig_msg, vk))
verified_header, verified_payload = verified_input.split('.')
verified_header = binary(verified_header)
decoded_header = native(urlsafe_b64decode(verified_header))
headers.append(json.loads(decoded_header))