Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def quantity_decoder(hex_str, allow_optional=False):
"""Decode `hexStr` representing a quantity."""
if allow_optional and hex_str is None:
return None
# must start with "0x"
if not hex_str.startswith("0x") or len(hex_str) < 3:
raise InvalidParams("Invalid quantity encoding")
try:
return int(hex_str, 16)
except ValueError:
raise InvalidParams("Invalid quantity encoding")
def full_shard_key_decoder(hex_str):
b = data_decoder(hex_str)
if len(b) != 4:
raise InvalidParams("Full shard id must be 4 bytes")
return int.from_bytes(b, byteorder="big")
def add(x, y):
"""Add two numbers."""
try:
return x + y
except TypeError:
raise exceptions.InvalidParams('Type error')
def get_block_by_hash(block_hash: bytes):
try:
block: bytes = TBEARS_DB.get(DB_PREFIX_BLOCK + block_hash)
block_json = get_block_result(json.loads(block))
except Exception:
raise GenericJsonRpcServerError(
code=InvalidParams.code,
message=f"Can't get block information",
http_status=status.HTTP_BAD_REQUEST)
else:
return block_json
def address_decoder(hex_str):
"""Decode an address from hex with 0x prefix to 24 bytes."""
addr_bytes = data_decoder(hex_str)
if len(addr_bytes) not in (24, 0):
raise InvalidParams("Addresses must be 24 or 0 bytes long")
return addr_bytes
"networkId", quantity_decoder, self.master.env.quark_chain_config.NETWORK_ID
)
gas_token_id = get_data_default(
"gas_token_id", quantity_decoder, self.env.quark_chain_config.genesis_token
)
transfer_token_id = get_data_default(
"transfer_token_id",
quantity_decoder,
self.env.quark_chain_config.genesis_token,
)
if nonce is None:
raise InvalidParams("Missing nonce")
if not (v and r and s):
raise InvalidParams("Missing v, r, s")
if from_full_shard_key is None:
raise InvalidParams("Missing fromFullShardKey")
if to_full_shard_key is None:
to_full_shard_key = from_full_shard_key
evm_tx = EvmTransaction(
nonce,
gasprice,
startgas,
to,
value,
data_,
v=v,
r=r,
s=s,
)
network_id = get_data_default(
"networkId", quantity_decoder, self.master.env.quark_chain_config.NETWORK_ID
)
gas_token_id = get_data_default(
"gas_token_id", quantity_decoder, self.env.quark_chain_config.genesis_token
)
transfer_token_id = get_data_default(
"transfer_token_id",
quantity_decoder,
self.env.quark_chain_config.genesis_token,
)
if nonce is None:
raise InvalidParams("Missing nonce")
if not (v and r and s):
raise InvalidParams("Missing v, r, s")
if from_full_shard_key is None:
raise InvalidParams("Missing fromFullShardKey")
if to_full_shard_key is None:
to_full_shard_key = from_full_shard_key
evm_tx = EvmTransaction(
nonce,
gasprice,
startgas,
to,
value,
data_,
v=v,
async def subscribe(self, sub_type, full_shard_id, params=None, context=None):
assert context is not None
full_shard_id = shard_id_decoder(full_shard_id)
if full_shard_id is None:
raise InvalidParams("Invalid full shard ID")
branch = Branch(full_shard_id)
shard = self.slave.shards.get(branch, None)
if not shard:
raise InvalidParams("Full shard ID not found")
websocket = context["websocket"]
sub_id = "0x" + uuid.uuid4().hex
shard_subscription_manager = self.shard_subscription_managers[full_shard_id]
extra = None
if sub_type == SUB_LOGS:
addresses, topics = _parse_log_request(params, address_decoder)
addresses = [Address(a.recipient, full_shard_id) for a in addresses]
extra = lambda candidate_blocks: LogFilter.create_from_block_candidates(
shard.state.db, addresses, topics, candidate_blocks
)
def recipient_decoder(hex_str, allow_optional=False):
"""Decode an recipient from hex with 0x prefix to 20 bytes."""
if allow_optional and hex_str is None:
return None
recipient_bytes = data_decoder(hex_str)
if len(recipient_bytes) not in (20, 0):
raise InvalidParams("Addresses must be 20 or 0 bytes long")
return recipient_bytes
gas_token_id = get_data_default(
"gas_token_id", quantity_decoder, self.env.quark_chain_config.genesis_token
)
transfer_token_id = get_data_default(
"transfer_token_id",
quantity_decoder,
self.env.quark_chain_config.genesis_token,
)
if nonce is None:
raise InvalidParams("Missing nonce")
if not (v and r and s):
raise InvalidParams("Missing v, r, s")
if from_full_shard_key is None:
raise InvalidParams("Missing fromFullShardKey")
if to_full_shard_key is None:
to_full_shard_key = from_full_shard_key
evm_tx = EvmTransaction(
nonce,
gasprice,
startgas,
to,
value,
data_,
v=v,
r=r,
s=s,
from_full_shard_key=from_full_shard_key,
to_full_shard_key=to_full_shard_key,