Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sort_keys=sort_keys, parent_id=parent_id)
session.add(new_wallet)
session.commit()
new_wallet_id = new_wallet.id
if scheme == 'bip44':
mk = HDWalletKey.from_key(key=key, name=name, session=session, wallet_id=new_wallet_id, network=network,
account_id=account_id, purpose=purpose, key_type='bip32')
if mk.depth > 4:
raise WalletError("Cannot create new wallet with main key of depth 5 or more")
new_wallet.main_key_id = mk.key_id
session.commit()
w = cls(new_wallet_id, databasefile=databasefile, main_key_object=mk.key())
if mk.depth == 0:
nw = Network(network)
networkcode = nw.bip44_cointype
path = ["%d'" % purpose, "%s'" % networkcode]
w._create_keys_from_path(mk, path, name=name, wallet_id=new_wallet_id, network=network, session=session,
account_id=account_id, purpose=purpose, basepath="m")
w.new_account(account_id=account_id)
elif scheme == 'multisig':
w = cls(new_wallet_id, databasefile=databasefile)
elif scheme == 'single':
mk = HDWalletKey.from_key(key=key, name=name, session=session, wallet_id=new_wallet_id, network=network,
account_id=account_id, purpose=purpose, key_type='single')
new_wallet.main_key_id = mk.key_id
session.commit()
w = cls(new_wallet_id, databasefile=databasefile, main_key_object=mk.key())
else:
raise WalletError("Wallet with scheme %s not supported at the moment" % scheme)
if not len(poppath):
new_path.append(pi)
else:
new_path.append(poppath.pop())
new_path = new_path[::-1]
else:
new_path = deepcopy(path)
# Replace variable names in path with corresponding values
# network, account_id, _ = self._get_account_defaults(network, account_id)
script_type_id = 1 if witness_type == 'p2sh-segwit' else 2
var_defaults = {
'network': network,
'account': account_id,
'purpose': purpose,
'coin_type': Network(network).bip44_cointype,
'script_type': script_type_id,
'cosigner_index': cosigner_id,
'change': change,
'address_index': address_index
}
npath = new_path
for i, pi in enumerate(new_path):
if not isinstance(pi, TYPE_TEXT):
pi = str(pi)
if pi in "mM":
continue
hardened = False
varname = pi
if pi[-1:] == "'" or (pi[-1:] in "HhPp" and pi[:-1].isdigit()):
varname = pi[:-1]
hardened = True
'public_key_uncompressed': wk.key().key.public_uncompressed(),
'public_key': wk.key().key.public(),
'depth': wk.depth,
'path': wk.path
})
if self.sort_keys:
public_keys.sort(key=lambda x: x['public_key'])
public_key_list = [x['public_key'] for x in public_keys]
public_key_ids = [str(x['key_id']) for x in public_keys]
depths = [x['depth'] for x in public_keys]
depth = 5 if len(set(depths)) != 1 else depths[0]
# Calculate redeemscript and address and add multisig key to database
redeemscript = serialize_multisig_redeemscript(public_key_list, n_required=self.multisig_n_required)
address = pubkeyhash_to_addr(script_to_pubkeyhash(redeemscript),
versionbyte=Network(network).prefix_address_p2sh)
if len(set([x['path'] for x in public_keys])) == 1:
path = public_keys[0]['path']
else:
path = "multisig-%d-of-" % self.multisig_n_required + '/'.join(public_key_ids)
if not name:
name = "Multisig Key " + '/'.join(public_key_ids)
multisig_key = DbKey(
name=name, wallet_id=self.wallet_id, purpose=self.purpose, account_id=account_id,
depth=depth, change=change, address_index=0, parent_id=0, is_private=False, path=path,
public=to_hexstring(redeemscript), wif='multisig-%s' % address, address=address,
key_type='multisig', network_name=network)
self._session.add(multisig_key)
self._session.commit()
for child_id in public_key_ids:
self._session.add(DbKeyMultisigChildren(key_order=public_key_ids.index(child_id),
parent_id=multisig_key.id, child_id=child_id))
self._session = session
else:
dbinit = DbInit(databasefile=databasefile)
self._session = dbinit.session
self._engine = dbinit.engine
self.databasefile = databasefile
if isinstance(wallet, int) or wallet.isdigit():
db_wlt = self._session.query(DbWallet).filter_by(id=wallet).scalar()
else:
db_wlt = self._session.query(DbWallet).filter_by(name=wallet).scalar()
if db_wlt:
self._dbwallet = db_wlt
self.wallet_id = db_wlt.id
self._name = db_wlt.name
self._owner = db_wlt.owner
self.network = Network(db_wlt.network_name)
self.purpose = db_wlt.purpose
self.scheme = db_wlt.scheme
self._balance = None
self._balances = []
self.main_key_id = db_wlt.main_key_id
self.main_key = None
self.default_account_id = 0
self.multisig_n_required = db_wlt.multisig_n_required
self.multisig_compressed = None
co_sign_wallets = self._session.query(DbWallet).\
filter(DbWallet.parent_id == self.wallet_id).order_by(DbWallet.name).all()
self.cosigner = [HDWallet(w.id, databasefile=databasefile) for w in co_sign_wallets]
self.sort_keys = db_wlt.sort_keys
if main_key_object:
self.main_key = HDWalletKey(self.main_key_id, session=self._session, hdkey_object=main_key_object)
elif db_wlt.main_key_id:
account_id = qr.account_id + 1
if not name:
name = 'Account #%d' % account_id
if self.keys(account_id=account_id, depth=3, network=network):
raise WalletError("Account with ID %d already exists for this wallet" % account_id)
if name in [k.name for k in self.keys(network=network)]:
raise WalletError("Account or key with name %s already exists for this wallet" % name)
# Get root key of new account
res = self.keys(depth=2, network=network)
if not res:
try:
purposekey = self.key(self.keys(depth=1)[0].id)
bip44_cointype = Network(network).bip44_cointype
duplicate_cointypes = [Network(x).network_name for x in self.network_list() if
Network(x).bip44_cointype == bip44_cointype]
if duplicate_cointypes:
raise WalletError("Can not create new account for network %s with same BIP44 cointype: %s" %
(network, duplicate_cointypes))
accrootkey_obj = self._create_keys_from_path(
purposekey, ["%s'" % str(bip44_cointype)], name=network, wallet_id=self.wallet_id,
account_id=account_id, network=network, purpose=self.purpose, basepath=purposekey.path,
session=self._session)
except IndexError:
raise WalletError("No key found for this wallet_id and purpose. Can not create new"
"account for this wallet, is there a master key imported?")
else:
accrootkey = res[0]
accrootkey_obj = self.key(accrootkey.id)
# Create new account addresses and return main account key
newpath = [str(account_id) + "'"]
def network_change(self, new_network):
"""
Change network for current key
:param new_network: Name of new network
:type new_network: str
:return bool: True
"""
self.network = Network(new_network)
return True
compressed = False
chain = b'\0' * 32
key = import_key.private_byte
key_type = 'private'
else:
kf = get_key_format(import_key)
if kf['format'] == 'address':
raise BKeyError("Can not create HDKey object from address")
if len(kf['script_types']) == 1:
self.script_type = kf['script_types'][0]
if len(kf['witness_types']) == 1 and not witness_type:
witness_type = kf['witness_types'][0]
encoding = get_encoding_from_witness(witness_type)
if len(kf['multisig']) == 1:
multisig = kf['multisig'][0]
network = Network(check_network_and_key(import_key, network, kf["networks"]))
if kf['format'] in ['hdkey_private', 'hdkey_public']:
bkey = change_base(import_key, 58, 256)
# Derive key, chain, depth, child_index and fingerprint part from extended key WIF
if ord(bkey[45:46]):
is_private = False
key = bkey[45:78]
else:
key = bkey[46:78]
depth = ord(bkey[4:5])
parent_fingerprint = bkey[5:9]
child_index = int(change_base(bkey[9:13], 256, 10))
chain = bkey[13:45]
# chk = bkey[78:82]
elif kf['format'] == 'address':
da = deserialize_address(import_key)
key = da['public_key_hash']
raise BKeyError("Can not create Key object from address")
self.key_format = kf["format"]
networks_extracted = kf["networks"]
self.is_private = is_private
if is_private is None:
if kf['is_private']:
self.is_private = True
elif kf['is_private'] is None:
raise BKeyError("Could not determine if key is private or public")
else:
self.is_private = False
network_name = None
if network is not None:
self.network = network
if not isinstance(network, Network):
self.network = Network(network)
network_name = self.network.name
network = check_network_and_key(import_key, network_name, networks_extracted)
self.network = Network(network)
if self.key_format == "wif_protected":
# TODO: return key as byte (?)
# FIXME: Key format is changed so old 'wif_protected' is forgotten
import_key, self.key_format = self._bip38_decrypt(import_key, passphrase)
if not self.is_private:
self.secret = None
pub_key = to_hexstring(import_key)
if len(pub_key) == 130:
self.public_uncompressed_hex = pub_key
self._x = pub_key[2:66]
self._y = pub_key[66:130]
order_by(DbKey.account_id.desc()).first()
if qr:
account_id = qr.account_id + 1
if not name:
name = 'Account #%d' % account_id
if self.keys(account_id=account_id, depth=3, network=network):
raise WalletError("Account with ID %d already exists for this wallet" % account_id)
if name in [k.name for k in self.keys(network=network)]:
raise WalletError("Account or key with name %s already exists for this wallet" % name)
# Get root key of new account
res = self.keys(depth=2, network=network)
if not res:
try:
purposekey = self.key(self.keys(depth=1)[0].id)
bip44_cointype = Network(network).bip44_cointype
duplicate_cointypes = [Network(x).network_name for x in self.network_list() if
Network(x).bip44_cointype == bip44_cointype]
if duplicate_cointypes:
raise WalletError("Can not create new account for network %s with same BIP44 cointype: %s" %
(network, duplicate_cointypes))
accrootkey_obj = self._create_keys_from_path(
purposekey, ["%s'" % str(bip44_cointype)], name=network, wallet_id=self.wallet_id,
account_id=account_id, network=network, purpose=self.purpose, basepath=purposekey.path,
session=self._session)
except IndexError:
raise WalletError("No key found for this wallet_id and purpose. Can not create new"
"account for this wallet, is there a master key imported?")
else:
accrootkey = res[0]
accrootkey_obj = self.key(accrootkey.id)
rawtx = to_bytes(rawtx)
version = rawtx[0:4][::-1]
coinbase = False
flag = None
witness_type = 'legacy'
cursor = 4
if rawtx[4:5] == b'\0':
flag = rawtx[5:6]
if flag == b'\1':
witness_type = 'segwit'
cursor += 2
n_inputs, size = varbyteint_to_int(rawtx[cursor:cursor+9])
cursor += size
inputs = []
if not isinstance(network, Network):
network = Network(network)
for n in range(0, n_inputs):
inp_hash = rawtx[cursor:cursor + 32][::-1]
if not len(inp_hash):
raise TransactionError("Input transaction hash not found. Probably malformed raw transaction")
if inp_hash == 32 * b'\0':
coinbase = True
output_n = rawtx[cursor + 32:cursor + 36][::-1]
cursor += 36
unlocking_script_size, size = varbyteint_to_int(rawtx[cursor:cursor + 9])
cursor += size
unlocking_script = rawtx[cursor:cursor + unlocking_script_size]
inp_type = 'legacy'
if witness_type == 'segwit' and not unlocking_script_size:
inp_type = 'segwit'
cursor += unlocking_script_size
sequence_number = rawtx[cursor:cursor + 4]