Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))
data = json.loads(payload.decode("utf8"))
if not self.allow_rooted and data["ctsProfileMatch"] is not True:
raise InvalidData("ctsProfileMatch must be true!")
expected_nonce = sha256(auth_data + client_data_hash)
if not bytes_eq(expected_nonce, websafe_decode(data["nonce"])):
raise InvalidData("Nonce does not match!")
data = json.loads(header.decode("utf8"))
certs = [
x509.load_der_x509_certificate(websafe_decode(x), default_backend())
for x in data["x5c"]
]
certs.append(self._ca)
cert = certs.pop(0)
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn[0].value != "attest.android.com":
raise InvalidData("Certificate not issued to attest.android.com!")
CoseKey.for_name(data["alg"]).from_cryptography_key(cert.public_key()).verify(
jwt.rsplit(b".", 1)[0], sig
)
while certs:
child = cert
cert = certs.pop(0)
def test_websafe_decode(self):
self.assertEqual(websafe_decode(b""), b"")
self.assertEqual(websafe_decode(b"Zg"), b"f")
self.assertEqual(websafe_decode(b"Zm8"), b"fo")
self.assertEqual(websafe_decode(b"Zm9v"), b"foo")
self.assertEqual(websafe_decode(b"Zm9vYg"), b"foob")
self.assertEqual(websafe_decode(b"Zm9vYmE"), b"fooba")
self.assertEqual(websafe_decode(b"Zm9vYmFy"), b"foobar")
def test_websafe_decode(self):
self.assertEqual(websafe_decode(b""), b"")
self.assertEqual(websafe_decode(b"Zg"), b"f")
self.assertEqual(websafe_decode(b"Zm8"), b"fo")
self.assertEqual(websafe_decode(b"Zm9v"), b"foo")
self.assertEqual(websafe_decode(b"Zm9vYg"), b"foob")
self.assertEqual(websafe_decode(b"Zm9vYmE"), b"fooba")
self.assertEqual(websafe_decode(b"Zm9vYmFy"), b"foobar")
resp = client.register(
APP_ID,
[{"version": "U2F_V2", "challenge": "foobar"}],
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
event=event,
)
event.wait.assert_called()
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called_once()
client.ctap.register.assert_called()
client_param, app_param = client.ctap.register.call_args[0]
self.assertEqual(sha256(websafe_decode(resp["clientData"])), client_param)
self.assertEqual(websafe_decode(resp["registrationData"]), REG_DATA)
self.assertEqual(sha256(APP_ID.encode()), app_param)
def from_b64(cls, data):
return cls(websafe_decode(data))
err('Need fido2 package(s) for webauthn. Consider doing `pip install fido2` (or similar)')
devices = list(CtapHidDevice.list_devices())
if not devices:
err('webauthn configured, but no U2F devices found')
provider = factor.get('provider', '')
log('mfa {0} challenge request [okta_url]'.format(provider))
data = {
'stateToken': state_token
}
_, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data, expected_url=conf.okta_url)
rfactor = j['_embedded']['factor']
profile = rfactor['profile']
purl = parse_url(conf.okta_url)
origin = '{0}://{1}'.format(purl[0], purl[1])
challenge = rfactor['_embedded']['challenge']['challenge']
credentialId = websafe_decode(profile['credentialId'])
allow_list = [{'type': 'public-key', 'id': credentialId}]
for dev in devices:
client = Fido2Client(dev, origin)
print('!!! Touch the flashing U2F device to authenticate... !!!')
try:
result = client.get_assertion(purl[1], challenge, allow_list)
dbg(conf.debug, 'assertion.result', result)
break
except Exception:
traceback.print_exc(file=sys.stderr)
result = None
if not result:
return None
assertion, client_data = result[0][0], result[1] # only one cred in allowList, so only one response.
data = {
'stateToken': state_token,
def challenge(self):
return websafe_decode(self.get("challenge"))
def register_complete(self, state, client_data, attestation_object):
"""Verify the correctness of the registration data received from
the client.
:param state: The state data returned by the corresponding
`register_begin`.
:param client_data: The client data.
:param attestation_object: The attestation object.
:return: The authenticator data"""
if client_data.get("type") != WEBAUTHN_TYPE.MAKE_CREDENTIAL:
raise ValueError("Incorrect type in ClientData.")
if not self._verify(client_data.get("origin")):
raise ValueError("Invalid origin in ClientData.")
if not constant_time.bytes_eq(
websafe_decode(state["challenge"]), client_data.challenge
):
raise ValueError("Wrong challenge in response.")
if not constant_time.bytes_eq(
self.rp.id_hash, attestation_object.auth_data.rp_id_hash
):
raise ValueError("Wrong RP ID hash in response.")
if not attestation_object.auth_data.is_user_present():
raise ValueError("User Present flag not set.")
if (
state["user_verification"] == UserVerificationRequirement.REQUIRED
and not attestation_object.auth_data.is_user_verified()
):
raise ValueError(
"User verification required, but User Verified flag not set."
)
def register(
self, app_id, register_requests, registered_keys, event=None, on_keepalive=None
):
self._verify_app_id(app_id)
version = self.ctap.get_version()
dummy_param = b"\0" * 32
for key in registered_keys:
if key["version"] != version:
continue
key_app_id = key.get("appId", app_id)
app_param = sha256(key_app_id.encode())
self._verify_app_id(key_app_id)
key_handle = websafe_decode(key["keyHandle"])
try:
self.ctap.authenticate(dummy_param, app_param, key_handle, True)
raise ClientError.ERR.DEVICE_INELIGIBLE() # Bad response
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
raise ClientError.ERR.DEVICE_INELIGIBLE()
except CtapError as e:
raise _ctap2client_err(e)
for request in register_requests:
if request["version"] == version:
challenge = request["challenge"]
break
else:
raise ClientError.ERR.DEVICE_INELIGIBLE()