Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for index in range(msg.transactions_count):
progress.advance()
tx_ack = await request_transaction(ctx, tx_req, index)
tx_hash = hashlib.blake2b(
data=bytes(tx_ack.transaction), outlen=32
).digest()
tx_decoded = cbor.decode(tx_ack.transaction)
for i, input in enumerate(msg.inputs):
if not attested[i] and input.prev_hash == tx_hash:
attested[i] = True
outputs = tx_decoded[1]
amount = outputs[input.prev_index][1]
input_coins_sum += amount
if not all(attested):
raise wire.ProcessError(
"No tx data sent for input " + str(attested.index(False))
)
transaction = Transaction(
msg.inputs, msg.outputs, keychain, msg.protocol_magic, input_coins_sum
)
for i in msg.inputs:
await validate_path(ctx, validate_full_path, keychain, i.address_n, CURVE)
# sign the transaction bundle and prepare the result
tx_body, tx_hash = transaction.serialise_tx()
tx = CardanoSignedTx(tx_body=tx_body, tx_hash=tx_hash)
except ValueError as e:
if __debug__:
def derive(self, node_path: list) -> bip32.HDNode:
# check we are in the cardano namespace
prefix = node_path[: len(self.path)]
suffix = node_path[len(self.path) :]
if prefix != self.path:
raise wire.DataError("Forbidden key path")
# derive child node from the root
node = self.root.clone()
for i in suffix:
node.derive_cardano(i)
return node
async def backup_device(ctx, msg):
if not storage.is_initialized():
raise wire.ProcessError("Device is not initialized")
if not storage.needs_backup():
raise wire.ProcessError("Seed already backed up")
words = mnemonic.restore()
# warn user about mnemonic safety
await show_warning(ctx)
storage.set_unfinished_backup(True)
storage.set_backed_up()
while True:
# show mnemonic and require confirmation of a random word
await show_mnemonic(ctx, words)
if await check_mnemonic(ctx, words):
break
await show_wrong_entry(ctx)
def boot() -> None:
ns = [[CURVE, HARDENED | 44, HARDENED | 194]]
wire.add(MessageType.EosGetPublicKey, __name__, "get_public_key", ns)
wire.add(MessageType.EosSignTx, __name__, "sign_tx", ns)
async def backup_device(ctx, msg):
if not storage.is_initialized():
raise wire.ProcessError("Device is not initialized")
if not storage.needs_backup():
raise wire.ProcessError("Seed already backed up")
words = mnemonic.restore()
# warn user about mnemonic safety
await show_warning(ctx)
storage.set_unfinished_backup(True)
storage.set_backed_up()
while True:
# show mnemonic and require confirmation of a random word
await show_mnemonic(ctx, words)
if await check_mnemonic(ctx, words):
break
def bytes_from_address(address: str) -> bytes:
if len(address) == 40:
return unhexlify(address)
elif len(address) == 42:
if address[0:2] not in ("0x", "0X"):
raise wire.ProcessError("Ethereum: invalid beginning of an address")
return unhexlify(address[2:])
elif len(address) == 0:
return bytes()
raise wire.ProcessError("Ethereum: Invalid address length")
async def set_u2f_counter(ctx: wire.Context, msg: SetU2FCounter) -> Success:
if msg.u2f_counter is None:
raise wire.ProcessError("No value provided")
text = Text("Set U2F counter", ui.ICON_CONFIG)
text.normal("Do you really want to", "set the U2F counter")
text.bold("to %d?" % msg.u2f_counter)
await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall)
storage.device.set_u2f_counter(msg.u2f_counter)
return Success(message="U2F counter set")
node = keychain.derive(address_n, coin.curve_name)
seckey = node.private_key()
address = get_address(script_type, coin, node)
digest = message_digest(coin, message)
signature = secp256k1.sign(seckey, digest)
if script_type == SPENDADDRESS:
pass
elif script_type == SPENDP2SHWITNESS:
signature = bytes([signature[0] + 4]) + signature[1:]
elif script_type == SPENDWITNESS:
signature = bytes([signature[0] + 8]) + signature[1:]
else:
raise wire.ProcessError("Unsupported script type")
return MessageSignature(address=address, signature=signature)
return require_confirm(ctx, text)
if not msg.remove and has_wipe_code:
text = Text("Change wipe code", ui.ICON_CONFIG)
text.normal("Do you really want to")
text.bold("change the wipe code?")
return require_confirm(ctx, text)
if not msg.remove and not has_wipe_code:
text = Text("Set wipe code", ui.ICON_CONFIG)
text.normal("Do you really want to")
text.bold("set the wipe code?")
return require_confirm(ctx, text)
# Removing non-existing wipe code.
raise wire.ProcessError("Wipe code protection is already disabled")