Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def sign_tx(ctx, msg: EosSignTx, keychain):
if msg.chain_id is None:
raise wire.DataError("No chain id")
if msg.header is None:
raise wire.DataError("No header")
if msg.num_actions is None or msg.num_actions == 0:
raise wire.DataError("No actions")
await paths.validate_path(ctx, validate_full_path, keychain, msg.address_n, CURVE)
node = keychain.derive(msg.address_n)
sha = HashWriter(sha256())
await _init(ctx, sha, msg)
await _actions(ctx, sha, msg.num_actions)
writers.write_variant32(sha, 0)
writers.write_bytes(sha, bytearray(32))
digest = sha.get_digest()
signature = secp256k1.sign(
def _public_key_to_wif(pub_key: bytes) -> str:
if pub_key[0] == 0x04 and len(pub_key) == 65:
head = b"\x03" if pub_key[64] & 0x01 else b"\x02"
compressed_pub_key = head + pub_key[1:33]
elif pub_key[0] in [0x02, 0x03] and len(pub_key) == 33:
compressed_pub_key = pub_key
else:
raise wire.DataError("invalid public key")
return base58_encode("EOS", "", compressed_pub_key)
def check(msg: EthereumSignTx):
if msg.tx_type not in [1, 6, None]:
raise wire.DataError("tx_type out of bounds")
if msg.chain_id < 0:
raise wire.DataError("chain_id out of bounds")
if msg.data_length > 0:
if not msg.data_initial_chunk:
raise wire.DataError("Data length provided, but no initial chunk")
# Our encoding only supports transactions up to 2^24 bytes. To
# prevent exceeding the limit we use a stricter limit on data length.
if msg.data_length > 16000000:
raise wire.DataError("Data length exceeds limit")
if len(msg.data_initial_chunk) > msg.data_length:
raise wire.DataError("Invalid size of initial chunk")
# safety checks
if not check_gas(msg) or not check_to(msg):
def _get_address_by_tag(address_hash):
prefixes = ["tz1", "tz2", "tz3"]
tag = int(address_hash[0])
if 0 <= tag < len(prefixes):
return helpers.base58_encode_check(address_hash[1:], prefix=prefixes[tag])
raise wire.DataError("Invalid tag in address hash")
def validate_path(self, checked_path: list, checked_curve: str) -> None:
for curve, *path in self.namespaces:
if path == checked_path[: len(path)] and curve == checked_curve:
if "ed25519" in curve and not _path_hardened(checked_path):
break
return
raise wire.DataError("Forbidden key path")
def check(msg: EthereumSignTx):
if msg.tx_type not in [1, 6, None]:
raise wire.DataError("tx_type out of bounds")
if msg.chain_id < 0:
raise wire.DataError("chain_id out of bounds")
if msg.data_length > 0:
if not msg.data_initial_chunk:
raise wire.DataError("Data length provided, but no initial chunk")
# Our encoding only supports transactions up to 2^24 bytes. To
# prevent exceeding the limit we use a stricter limit on data length.
if msg.data_length > 16000000:
raise wire.DataError("Data length exceeds limit")
if len(msg.data_initial_chunk) > msg.data_length:
raise wire.DataError("Invalid size of initial chunk")
# safety checks
if not check_gas(msg) or not check_to(msg):
raise wire.DataError("Safety check failed")
if script_type == InputScriptType.SPENDADDRESS and coin.xpub_magic is not None:
node_xpub = node.serialize_public(coin.xpub_magic)
elif (
coin.segwit
and script_type == InputScriptType.SPENDP2SHWITNESS
and coin.xpub_magic_segwit_p2sh is not None
):
node_xpub = node.serialize_public(coin.xpub_magic_segwit_p2sh)
elif (
coin.segwit
and script_type == InputScriptType.SPENDWITNESS
and coin.xpub_magic_segwit_native is not None
):
node_xpub = node.serialize_public(coin.xpub_magic_segwit_native)
else:
raise wire.DataError("Invalid combination of coin and script_type")
pubkey = node.public_key()
if pubkey[0] == 1:
pubkey = b"\x00" + pubkey[1:]
node_type = HDNodeType(
depth=node.depth(),
child_num=node.child_num(),
fingerprint=node.fingerprint(),
chain_code=node.chain_code(),
public_key=pubkey,
)
if msg.show_display:
await layout.show_pubkey(ctx, pubkey)
return PublicKey(node=node_type, xpub=node_xpub)
def check(msg: EthereumSignTx):
if msg.tx_type not in [1, 6, None]:
raise wire.DataError("tx_type out of bounds")
if msg.chain_id < 0:
raise wire.DataError("chain_id out of bounds")
if msg.data_length > 0:
if not msg.data_initial_chunk:
raise wire.DataError("Data length provided, but no initial chunk")
# Our encoding only supports transactions up to 2^24 bytes. To
# prevent exceeding the limit we use a stricter limit on data length.
if msg.data_length > 16000000:
raise wire.DataError("Data length exceeds limit")
if len(msg.data_initial_chunk) > msg.data_length:
raise wire.DataError("Invalid size of initial chunk")
# safety checks
if not check_gas(msg) or not check_to(msg):
raise wire.DataError("Safety check failed")
):
node_xpub = node.serialize_public(coin.xpub_magic)
elif (
coin.segwit
and script_type == InputScriptType.SPENDP2SHWITNESS
and coin.xpub_magic_segwit_p2sh is not None
):
node_xpub = node.serialize_public(coin.xpub_magic_segwit_p2sh)
elif (
coin.segwit
and script_type == InputScriptType.SPENDWITNESS
and coin.xpub_magic_segwit_native is not None
):
node_xpub = node.serialize_public(coin.xpub_magic_segwit_native)
else:
raise wire.DataError("Invalid combination of coin and script_type")
pubkey = node.public_key()
if pubkey[0] == 1:
pubkey = b"\x00" + pubkey[1:]
node_type = HDNodeType(
depth=node.depth(),
child_num=node.child_num(),
fingerprint=node.fingerprint(),
chain_code=node.chain_code(),
public_key=pubkey,
)
if msg.show_display:
await layout.show_pubkey(ctx, pubkey)
return PublicKey(node=node_type, xpub=node_xpub)
def _get_address_from_contract(address):
if address.tag == TezosContractType.Implicit:
return _get_address_by_tag(address.hash)
elif address.tag == TezosContractType.Originated:
return helpers.base58_encode_check(
address.hash[:-1], prefix=helpers.TEZOS_ORIGINATED_ADDRESS_PREFIX
)
raise wire.DataError("Invalid contract type")