Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_register(self):
client = U2fClient(None, APP_ID)
client.ctap = mock.MagicMock()
client.ctap.get_version.return_value = "U2F_V2"
client.ctap.authenticate.side_effect = ApduError(APDU.WRONG_DATA)
client.ctap.register.return_value = REG_DATA
resp = client.register(
APP_ID,
[{"version": "U2F_V2", "challenge": "foobar"}],
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
)
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called_once()
client.ctap.register.assert_called_once()
client_param, app_param = client.ctap.register.call_args[0]
self.assertEqual(sha256(websafe_decode(resp["clientData"])), client_param)
self.assertEqual(websafe_decode(resp["registrationData"]), REG_DATA)
self.assertEqual(sha256(APP_ID.encode()), app_param)
def test_register_existing_key(self):
client = U2fClient(None, APP_ID)
client.ctap = mock.MagicMock()
client.ctap.get_version.return_value = "U2F_V2"
client.ctap.authenticate.side_effect = ApduError(APDU.USE_NOT_SATISFIED)
try:
client.register(
APP_ID,
[{"version": "U2F_V2", "challenge": "foobar"}],
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
)
self.fail("register did not raise error")
except ClientError as e:
self.assertEqual(e.code, ClientError.ERR.DEVICE_INELIGIBLE)
client.ctap.get_version.assert_called_with()
client.ctap.authenticate.assert_called_once()
# Check keyHandle
self.assertEqual(client.ctap.authenticate.call_args[0][2], b"key")
# Ensure check-only was set
def test_sign_await_touch(self):
client = U2fClient(None, APP_ID)
client.ctap = mock.MagicMock()
client.ctap.get_version.return_value = "U2F_V2"
client.ctap.authenticate.side_effect = [
ApduError(APDU.USE_NOT_SATISFIED),
ApduError(APDU.USE_NOT_SATISFIED),
ApduError(APDU.USE_NOT_SATISFIED),
ApduError(APDU.USE_NOT_SATISFIED),
SIG_DATA,
]
event = Event()
event.wait = mock.MagicMock()
resp = client.sign(
APP_ID,
"challenge",
[{"version": "U2F_V2", "keyHandle": "a2V5"}],
event=event,
)
event.wait.assert_called()
client.ctap.get_version.assert_called_with()
def test_send_apdu_err(self):
ctap = CTAP1(mock.MagicMock())
ctap.device.call.return_value = b"err\x6a\x80"
try:
ctap.send_apdu(1, 2, 3, 4, b"foobar")
self.fail("send_apdu did not raise error")
except ApduError as e:
self.assertEqual(e.code, 0x6A80)
self.assertEqual(e.data, b"err")
ctap.device.call.assert_called_with(0x03, b"\1\2\3\4\0\0\6foobar\0\0")
def _call_polling(poll_delay, event, on_keepalive, func, *args, **kwargs):
event = event or Event()
while not event.is_set():
try:
return func(*args, **kwargs)
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
if on_keepalive:
on_keepalive(STATUS.UPNEEDED)
on_keepalive = None
event.wait(poll_delay)
else:
raise ClientError.ERR.OTHER_ERROR(e)
except CtapError as e:
raise _ctap2client_err(e)
raise ClientError.ERR.TIMEOUT()
:param p1: The P1 parameter of the request.
:param p2: The P2 parameter of the request.
:param data: The body of the request.
:return: The response APDU data of a successful request.
:raise: ApduError
"""
size = len(data)
size_h = size >> 16 & 0xFF
size_l = size & 0xFFFF
apdu = struct.pack(">BBBBBH", cla, ins, p1, p2, size_h, size_l) + data + b"\0\0"
response = self.device.call(CTAPHID.MSG, apdu)
status = struct.unpack(">H", response[-2:])[0]
data = response[:-2]
if status != APDU.OK:
raise ApduError(status, data)
return data
except CtapError as e:
if e.code == CtapError.ERR.PIN_INVALID:
ctx.fail('Wrong PIN.')
if e.code == CtapError.ERR.PIN_AUTH_BLOCKED:
ctx.fail(
'PIN authentication is currently blocked. '
'Remove and re-insert the YubiKey.')
if e.code == CtapError.ERR.PIN_BLOCKED:
ctx.fail('PIN is blocked.')
if e.code == CtapError.ERR.PIN_POLICY_VIOLATION:
ctx.fail('New PIN is too long.')
logger.error('Failed to change PIN', exc_info=e)
ctx.fail('Failed to change PIN.')
except ApduError as e:
if e.code == SW.VERIFY_FAIL_NO_RETRY:
ctx.fail('Wrong PIN.')
if e.code == SW.AUTH_METHOD_BLOCKED:
ctx.fail('PIN is blocked.')
logger.error('Failed to change PIN', exc_info=e)
ctx.fail('Failed to change PIN.')
version = request['version']
if version == u2f_version:
app_id = request['appId']
challenge = request['challenge']
key_handle = base64.urlsafe_b64decode(request['keyHandle'] + '==')
app_id_hash = sha256(app_id.encode('ascii')).digest()
cl_data = {
'typ': U2F_TYPE.SIGN,
'challenge': challenge,
'origin': app_id
}
client_data = json.dumps(cl_data)
try:
client_param = sha256(client_data.encode('utf8')).digest()
u2f_client.authenticate(client_param, app_id_hash, key_handle, check_only=True)
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
to_auth.append((u2f_client, client_data, app_id_hash, key_handle))
except:
pass
if to_auth:
u2f_thread = threading.Thread(target=thread_function, args=((to_auth,)))
u2f_thread.start()
try:
get_input_interrupted('\nTouch the flashing U2F device to authenticate or press Enter to resume with the primary two factor authentication...\n')
should_cancel_u2f = True
u2f_thread.join()
except KeyboardInterrupt:
pass
return u2f_response
def work(self, client):
for _ in range(30):
try:
self._signature = client.authenticate(
self._nonce, self._appId, self._credentialId )
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
if not self._has_prompted:
self.ui.info('\nTouch your authenticator device now...\n')
self._has_prompted = True
time.sleep(0.5)
continue
else:
raise FIDODeviceError
break
if self._signature is None:
raise FIDODeviceError
self._cancel.set()