Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn[0].value != "attest.android.com":
raise InvalidData("Certificate not issued to attest.android.com!")
CoseKey.for_name(data["alg"]).from_cryptography_key(cert.public_key()).verify(
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn[0].value != "attest.android.com":
raise InvalidData("Certificate not issued to attest.android.com!")
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn[0].value != "attest.android.com":
raise InvalidData("Certificate not issued to attest.android.com!")
CoseKey.for_name(data["alg"]).from_cryptography_key(cert.public_key()).verify(
jwt.rsplit(b".", 1)[0], sig
)
while certs:
child = cert
cert = certs.pop(0)
pub = cert.public_key()
if isinstance(pub, rsa.RSAPublicKey):
pub.verify(
child.signature,
child.tbs_certificate_bytes,
padding.PKCS1v15(),
child.signature_hash_algorithm,
)
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn[0].value != "attest.android.com":
raise InvalidData("Certificate not issued to attest.android.com!")
CoseKey.for_name(data["alg"]).from_cryptography_key(cert.public_key()).verify(
jwt.rsplit(b".", 1)[0], sig
)
while certs:
child = cert
cert = certs.pop(0)
event.wait = mock.MagicMock()
resp = client.sign(
APP_ID,
"challenge",
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
event=event,
)
event.wait.assert_called()
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called()
client_param, app_param, key_handle = client.ctap.authenticate.call_args[0]
self.assertEqual(client_param, sha256(websafe_decode(resp["clientData"])))
self.assertEqual(app_param, sha256(APP_ID.encode()))
self.assertEqual(key_handle, b"key")
self.assertEqual(websafe_decode(resp["signatureData"]), SIG_DATA)
def test_sign(self):
client = U2fClient(None, APP_ID)
client.ctap = mock.MagicMock()
client.ctap.get_version.return_value = "U2F_V2"
client.ctap.authenticate.return_value = SIG_DATA
resp = client.sign(
APP_ID, "challenge", [{"version": "U2F_V2", "keyHandle": "a2V5"}]
)
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called_once()
client_param, app_param, key_handle = client.ctap.authenticate.call_args[0]
self.assertEqual(client_param, sha256(websafe_decode(resp["clientData"])))
self.assertEqual(app_param, sha256(APP_ID.encode()))
self.assertEqual(key_handle, b"key")
self.assertEqual(websafe_decode(resp["signatureData"]), SIG_DATA)
resp = client.sign(
APP_ID,
"challenge",
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
event=event,
)
event.wait.assert_called()
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called()
client_param, app_param, key_handle = client.ctap.authenticate.call_args[0]
self.assertEqual(client_param, sha256(websafe_decode(resp["clientData"])))
self.assertEqual(app_param, sha256(APP_ID.encode()))
self.assertEqual(key_handle, b"key")
self.assertEqual(websafe_decode(resp["signatureData"]), SIG_DATA)
def test_make_credential(self):
ctap = CTAP2(mock.MagicMock())
ctap.device.call.return_value = b"\0" + _MC_RESP
resp = ctap.make_credential(1, 2, 3, 4)
ctap.device.call.assert_called_with(
0x10, b"\1" + cbor.encode({1: 1, 2: 2, 3: 3, 4: 4}), mock.ANY, None
)
self.assertIsInstance(resp, AttestationObject)
self.assertEqual(resp, _MC_RESP)
self.assertEqual(resp.fmt, "packed")
self.assertEqual(resp.auth_data, _AUTH_DATA_MC)
self.assertSetEqual(set(resp.att_statement.keys()), {"alg", "sig", "x5c"})
ou = ous[0]
if ou.value != "Authenticator Attestation":
raise InvalidData('Subject must have OU = "Authenticator Attestation"!')
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if not cn:
raise InvalidData("Subject must have CN set!")
if enforce_empty_subject:
s = cert.subject.get_attributes_for_oid(x509.NameOID)
if s:
raise InvalidData("Certificate should not have Subject")
if has_subject_alternative_name:
s = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
if not s:
raise InvalidData("Certificate should have SubjectAlternativeName")
if has_aik_certificate:
ext = cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
has_aik = [x == OID_AIK_CERTIFICATE for x in ext.value]
if True not in has_aik:
raise InvalidData(
'Extended key usage MUST contain the "joint-iso-itu-t(2) '
"internationalorganizations(23) 133 tcg-kp(8) "
'tcg-kp-AIKCertificate(3)" OID.'
)
bc = cert.extensions.get_extension_for_class(x509.BasicConstraints)
if bc.value.ca:
raise InvalidData("Attestation certificate must have CA=false!")
try:
ext = cert.extensions.get_extension_for_oid(OID_AAGUID)
if ext.critical: