Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ifrm = overlay_struct(buf, desc_init)
bcnt = ifrm.bcnt
data = ifrm.data
datalen = len(data)
seq = 0
if ifrm.cmd & _TYPE_MASK == _TYPE_CONT:
# unexpected cont packet, abort current msg
if __debug__:
log.warning(__name__, "_TYPE_CONT")
return None
if datalen < bcnt:
databuf = bytearray(bcnt)
utils.memcpy(databuf, 0, data, 0, bcnt)
data = databuf
else:
data = data[:bcnt]
while datalen < bcnt:
buf = await read
cfrm = overlay_struct(buf, desc_cont)
if cfrm.seq == _CMD_INIT:
# _CMD_INIT frame, cancels current channel
ifrm = overlay_struct(buf, desc_init)
data = ifrm.data[: ifrm.bcnt]
break
if cfrm.cid != ifrm.cid:
if ifrm.cid == 0 or ((ifrm.cid == _CID_BROADCAST) and (ifrm.cmd != _CMD_INIT)):
# CID 0 is reserved for future use and _CID_BROADCAST is reserved for channel allocation
await send_cmd(cmd_error(ifrm.cid, _ERR_INVALID_CID), iface)
return None
if bcnt > _MAX_U2FHID_MSG_PAYLOAD_LEN:
# invalid payload length, abort current msg
if __debug__:
log.warning(__name__, "_MAX_U2FHID_MSG_PAYLOAD_LEN")
await send_cmd(cmd_error(ifrm.cid, _ERR_INVALID_LEN), iface)
return None
if datalen < bcnt:
databuf = bytearray(bcnt)
utils.memcpy(databuf, 0, data, 0, bcnt)
data = databuf
else:
data = data[:bcnt]
while datalen < bcnt:
buf = await loop.race(read, loop.sleep(_CTAP_HID_TIMEOUT_MS * 1000))
if not isinstance(buf, (bytes, bytearray)):
await send_cmd(cmd_error(ifrm.cid, _ERR_MSG_TIMEOUT), iface)
return None
cfrm = overlay_struct(buf, desc_cont)
if cfrm.seq == _CMD_INIT:
# _CMD_INIT frame, cancels current channel
break
# sign the digest and convert to der
sig = nist256p1.sign(_U2F_ATT_PRIV_KEY, dig, False)
sig = der.encode_seq((sig[1:33], sig[33:]))
# pack to a response
buf, resp = make_struct(
resp_cmd_register(len(keybuf) + len(keybase), len(_U2F_ATT_CERT), len(sig))
)
resp.registerId = _U2F_REGISTER_ID
utils.memcpy(resp.pubKey, 0, pubkey, 0, len(pubkey))
resp.keyHandleLen = len(keybuf) + len(keybase)
utils.memcpy(resp.keyHandle, 0, keybuf, 0, len(keybuf))
utils.memcpy(resp.keyHandle, len(keybuf), keybase, 0, len(keybase))
utils.memcpy(resp.cert, 0, _U2F_ATT_CERT, 0, len(_U2F_ATT_CERT))
utils.memcpy(resp.sig, 0, sig, 0, len(sig))
resp.status = _SW_NO_ERROR
return buf
if len(rsig.L) > 127:
raise ValueError("Too large")
# Manual serialization as the generic purpose serialize.dump_msg_gc
# is more memory intensive which is not desired in the range proof section.
# BP: V, A, S, T1, T2, taux, mu, L, R, a, b, t
# Commitment vector V is not serialized
# Vector size under 127 thus varint occupies 1 B
buff_size = 32 * (9 + 2 * (len(rsig.L))) + 2
buff = bytearray(buff_size)
utils.memcpy(buff, 0, rsig.A, 0, 32)
utils.memcpy(buff, 32, rsig.S, 0, 32)
utils.memcpy(buff, 32 * 2, rsig.T1, 0, 32)
utils.memcpy(buff, 32 * 3, rsig.T2, 0, 32)
utils.memcpy(buff, 32 * 4, rsig.taux, 0, 32)
utils.memcpy(buff, 32 * 5, rsig.mu, 0, 32)
buff[32 * 6] = len(rsig.L)
offset = 32 * 6 + 1
for x in rsig.L:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
buff[offset] = len(rsig.R)
offset += 1
for x in rsig.R:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
) -> bytes:
"""
Creates an unique-purpose key
"""
key_buff = BUILD_KEY_BUFFER # bytearray(32 + 12 + 4) # key + disc + index
utils.ensure(len(secret) == 32, "Invalid key length")
utils.ensure(len(discriminator) <= 12, "Disc too long")
offset = 32
utils.memcpy(key_buff, 0, secret, 0, 32)
for i in range(32, len(key_buff)):
key_buff[i] = 0
if discriminator is not None:
utils.memcpy(key_buff, offset, discriminator, 0, len(discriminator))
offset += len(discriminator)
if index is not None:
# dump_uvarint_b_into, saving import
shifted = True
while shifted:
shifted = index >> 7
key_buff[offset] = (index & 0x7F) | (0x80 if shifted else 0x00)
offset += 1
index = shifted
return crypto.keccak_2hash(key_buff, out)
dig.update(pubkey) # uint8_t pubKey[65];
dig = dig.digest()
# sign the digest and convert to der
sig = nist256p1.sign(_U2F_ATT_PRIV_KEY, dig, False)
sig = der.encode_seq((sig[1:33], sig[33:]))
# pack to a response
buf, resp = make_struct(
resp_cmd_register(len(keybuf) + len(keybase), len(_U2F_ATT_CERT), len(sig))
)
resp.registerId = _U2F_REGISTER_ID
utils.memcpy(resp.pubKey, 0, pubkey, 0, len(pubkey))
resp.keyHandleLen = len(keybuf) + len(keybase)
utils.memcpy(resp.keyHandle, 0, keybuf, 0, len(keybuf))
utils.memcpy(resp.keyHandle, len(keybuf), keybase, 0, len(keybase))
utils.memcpy(resp.cert, 0, _U2F_ATT_CERT, 0, len(_U2F_ATT_CERT))
utils.memcpy(resp.sig, 0, sig, 0, len(sig))
resp.status = _SW_NO_ERROR
return buf
for x in rsig.L:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
buff[offset] = len(rsig.R)
offset += 1
for x in rsig.R:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.a, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.b, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.t, 0, 32)
return buff
# Vector size under 127 thus varint occupies 1 B
buff_size = 32 * (9 + 2 * (len(rsig.L))) + 2
buff = bytearray(buff_size)
utils.memcpy(buff, 0, rsig.A, 0, 32)
utils.memcpy(buff, 32, rsig.S, 0, 32)
utils.memcpy(buff, 32 * 2, rsig.T1, 0, 32)
utils.memcpy(buff, 32 * 3, rsig.T2, 0, 32)
utils.memcpy(buff, 32 * 4, rsig.taux, 0, 32)
utils.memcpy(buff, 32 * 5, rsig.mu, 0, 32)
buff[32 * 6] = len(rsig.L)
offset = 32 * 6 + 1
for x in rsig.L:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
buff[offset] = len(rsig.R)
offset += 1
for x in rsig.R:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.a, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.b, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.t, 0, 32)
return buff
# Manual serialization as the generic purpose serialize.dump_msg_gc
# is more memory intensive which is not desired in the range proof section.
# BP: V, A, S, T1, T2, taux, mu, L, R, a, b, t
# Commitment vector V is not serialized
# Vector size under 127 thus varint occupies 1 B
buff_size = 32 * (9 + 2 * (len(rsig.L))) + 2
buff = bytearray(buff_size)
utils.memcpy(buff, 0, rsig.A, 0, 32)
utils.memcpy(buff, 32, rsig.S, 0, 32)
utils.memcpy(buff, 32 * 2, rsig.T1, 0, 32)
utils.memcpy(buff, 32 * 3, rsig.T2, 0, 32)
utils.memcpy(buff, 32 * 4, rsig.taux, 0, 32)
utils.memcpy(buff, 32 * 5, rsig.mu, 0, 32)
buff[32 * 6] = len(rsig.L)
offset = 32 * 6 + 1
for x in rsig.L:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
buff[offset] = len(rsig.R)
offset += 1
for x in rsig.R:
utils.memcpy(buff, offset, x, 0, 32)
offset += 32
utils.memcpy(buff, offset, rsig.a, 0, 32)
def _serialize_ecdh(ecdh_info, v2=False):
"""
Serializes ECDH according to the current format defined by the hard fork version
or the signature format respectively.
"""
if v2:
# In HF10 the amount is serialized to 8B and mask is deterministic
ecdh_info_bin = bytearray(8)
ecdh_info_bin[:] = ecdh_info.amount[0:8]
return ecdh_info_bin
else:
ecdh_info_bin = bytearray(64)
utils.memcpy(ecdh_info_bin, 0, ecdh_info.mask, 0, 32)
utils.memcpy(ecdh_info_bin, 32, ecdh_info.amount, 0, 32)
return ecdh_info_bin