Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:type session: sqlalchemy.orm.session.Session
:param name: Name for generated keys. Leave empty for default
:type name: str
:param basepath: Basepath of main parent key
:type basepath: str
:param change: Change = 1, or payment = 0. Default is 0.
:type change: int
:param purpose: Purpose field according to BIP32 definition, default is 44 for BIP44.
:type purpose: int
:return HDWalletKey:
"""
# Initial checks and settings
if not isinstance(parent, HDWalletKey):
raise WalletError("Parent must be of type 'HDWalletKey'")
if not isinstance(path, list):
raise WalletError("Path must be of type 'list'")
if len(basepath) and basepath[-1] != "/":
basepath += "/"
nk = parent
ck = nk.key()
# Check for closest ancestor in wallet
spath = basepath + '/'.join(path)
rkey = None
while spath and not rkey:
rkey = self._session.query(DbKey).filter_by(wallet_id=wallet_id, path=spath).first()
spath = '/'.join(spath.split("/")[:-1])
if rkey is not None and rkey.path not in [basepath, basepath[:-1]]:
path = (basepath + '/'.join(path)).replace(rkey.path + '/', '').split('/')
basepath = rkey.path + '/'
:type purpose: int
:param multisig_compressed: Use compressed multisig keys for this wallet. Default is True
:type multisig_compressed: bool
:param sort_keys: Sort keys according to BIP45 standard (used for multisig keys)
:type sort_keys: bool
:param databasefile: Location of database file. Leave empty to use default
:type databasefile: str
:return HDWallet:
"""
if databasefile is None:
databasefile = DEFAULT_DATABASE
session = DbInit(databasefile=databasefile).session
if session.query(DbWallet).filter_by(name=name).count():
raise WalletError("Wallet with name '%s' already exists" % name)
else:
_logger.info("Create new multisig wallet '%s'" % name)
if not isinstance(key_list, list):
raise WalletError("Need list of keys to create multi-signature key structure")
if len(key_list) < 2:
raise WalletError("Key list must contain at least 2 keys")
if sigs_required is None:
sigs_required = len(key_list)
if sigs_required > len(key_list):
raise WalletError("Number of keys required to sign is greater then number of keys provided")
hdpm = cls.create(name=name, owner=owner, network=network, account_id=account_id,
purpose=purpose, scheme='multisig', sort_keys=sort_keys, databasefile=databasefile)
hdpm.multisig_compressed = multisig_compressed
co_id = 0
hdpm.cosigner = []
wlt = None
if args.wallet_name and not args.wallet_name.isdigit() and not wallet_exists(args.wallet_name,
db_uri=db_uri):
if not args.create_from_key and input(
"Wallet %s does not exist, create new wallet [yN]? " % args.wallet_name).lower() != 'y':
clw_exit('Aborted')
wlt = create_wallet(args.wallet_name, args, db_uri)
args.wallet_info = True
else:
try:
wlt = HDWallet(args.wallet_name, db_uri=db_uri)
if args.passphrase is not None:
print("WARNING: Using passphrase option for existing wallet ignored")
if args.create_from_key is not None:
print("WARNING: Using create_from_key option for existing wallet ignored")
except WalletError as e:
clw_exit("Error: %s" % e.msg)
if wlt is None:
clw_exit("Could not open wallet %s" % args.wallet_name)
if args.import_private:
if wlt.import_key(args.import_private):
clw_exit("Private key imported")
else:
clw_exit("Failed to import key")
if args.wallet_recreate:
wallet_empty(args.wallet_name)
print("Removed transactions and generated keys from this wallet")
if args.update_utxos:
wlt.utxos_update()
"""
if databasefile is None:
databasefile = DEFAULT_DATABASE
session = DbInit(databasefile=databasefile).session
if session.query(DbWallet).filter_by(name=name).count():
raise WalletError("Wallet with name '%s' already exists" % name)
else:
_logger.info("Create new multisig wallet '%s'" % name)
if not isinstance(key_list, list):
raise WalletError("Need list of keys to create multi-signature key structure")
if len(key_list) < 2:
raise WalletError("Key list must contain at least 2 keys")
if sigs_required is None:
sigs_required = len(key_list)
if sigs_required > len(key_list):
raise WalletError("Number of keys required to sign is greater then number of keys provided")
hdpm = cls.create(name=name, owner=owner, network=network, account_id=account_id,
purpose=purpose, scheme='multisig', sort_keys=sort_keys, databasefile=databasefile)
hdpm.multisig_compressed = multisig_compressed
co_id = 0
hdpm.cosigner = []
hdkey_list = []
for cokey in key_list:
if not isinstance(cokey, HDKey):
if len(cokey.split(' ')) > 5:
k = HDKey().from_passphrase(cokey, network=network)
else:
k = HDKey(cokey)
hdkey_list.append(k)
else:
hdkey_list.append(cokey)
clw_exit("\nError when deleting wallet")
else:
clw_exit("\nSpecified wallet name incorrect")
wlt = None
if args.wallet_name and not args.wallet_name.isdigit() and \
not wallet_exists(args.wallet_name, databasefile=databasefile):
if input("Wallet %s does not exist, create new wallet [yN]? " % args.wallet_name).lower() == 'y':
wlt = create_wallet(args.wallet_name, args, databasefile)
args.wallet_info = True
else:
try:
wlt = HDWallet(args.wallet_name, databasefile=databasefile)
if args.passphrase is not None or args.passphrase_strength is not None:
print("WARNING: Using passphrase options for existing wallet ignored")
except WalletError as e:
clw_exit("Error: %s" % e.msg)
if wlt is None:
clw_exit("Could not open wallet %s" % args.wallet_name)
if args.receive:
addr = wlt.get_key().address
print("Receive address is %s" % addr)
if QRCODES_AVAILABLE:
qrcode = pyqrcode.create(addr)
print(qrcode.terminal())
else:
print("Install qr code module to show QR codes: pip install pyqrcode")
clw_exit()
if args.scan:
print("Scanning wallet: updating addresses, transactions and balances")
print("Can take a while")
def _objects_by_key_id(key_id):
key = self._session.query(DbKey).filter_by(id=key_id).scalar()
if not key:
raise WalletError("Key '%s' not found in this wallet" % key_id)
if key.key_type == 'multisig':
inp_keys = []
for ck in key.multisig_children:
inp_keys.append(HDKey(ck.child_key.wif).key)
script_type = 'p2sh_multisig'
elif key.key_type in ['bip32', 'single']:
inp_keys = HDKey(key.wif, compressed=key.compressed).key
script_type = 'p2pkh'
else:
raise WalletError("Input key type %s not supported" % key.key_type)
return inp_keys, script_type, key
transaction.fee_per_kb = None
if transaction_fee is None:
if not input_arr:
transaction.fee_per_kb = srv.estimatefee()
fee_estimate = (transaction.estimate_size() / 1024.0 * transaction.fee_per_kb)
else:
fee_estimate = 0
else:
fee_estimate = transaction_fee
# Add inputs
amount_total_input = 0
if input_arr is None:
selected_utxos = _select_inputs(amount_total_output + fee_estimate, self.network.dust_amount)
if not selected_utxos:
raise WalletError("Not enough unspent transaction outputs found")
for utxo in selected_utxos:
amount_total_input += utxo.value
inp_keys, script_type, key = _objects_by_key_id(utxo.key_id)
transaction.add_input(utxo.transaction.hash, utxo.output_n, keys=inp_keys, script_type=script_type,
sigs_required=self.multisig_n_required, sort=self.sort_keys,
compressed=key.compressed, value=utxo.value)
else:
for inp in input_arr:
if isinstance(inp, Input):
prev_hash = inp.prev_hash
output_n = inp.output_n
key_id = None
value = inp.value
signatures = inp.signatures
unlocking_script = inp.unlocking_script
address = inp.address
if network is None:
network = self.network.network_name
# Determine account_id and name
if account_id is None:
account_id = 0
qr = self._session.query(DbKey). \
filter_by(wallet_id=self.wallet_id, purpose=self.purpose, network_name=network). \
order_by(DbKey.account_id.desc()).first()
if qr:
account_id = qr.account_id + 1
if not name:
name = 'Account #%d' % account_id
if self.keys(account_id=account_id, depth=3, network=network):
raise WalletError("Account with ID %d already exists for this wallet" % account_id)
if name in [k.name for k in self.keys(network=network)]:
raise WalletError("Account or key with name %s already exists for this wallet" % name)
# Get root key of new account
res = self.keys(depth=2, network=network)
if not res:
try:
purposekey = self.key(self.keys(depth=1)[0].id)
bip44_cointype = Network(network).bip44_cointype
duplicate_cointypes = [Network(x).network_name for x in self.network_list() if
Network(x).bip44_cointype == bip44_cointype]
if duplicate_cointypes:
raise WalletError("Can not create new account for network %s with same BIP44 cointype: %s" %
(network, duplicate_cointypes))
accrootkey_obj = self._create_keys_from_path(
purposekey, ["%s'" % str(bip44_cointype)], name=network, wallet_id=self.wallet_id,
:param scan_gap_limit: Amount of new keys and change keys (addresses) created for this wallet
:type scan_gap_limit: int
:param account_id: Account ID. Default is last used or created account ID.
:type account_id: int
:param change: Filter by change addresses. Set to True to include only change addresses, False to only include regular addresses. None (default) to disable filter and include both
:param network: Network name. Leave empty for default network
:type network: str
:return:
"""
if _keys_ignore is None:
_keys_ignore = []
if _recursion_depth > 10:
raise WalletError("UTXO scanning has reached a recursion depth of more then 10")
_recursion_depth += 1
if self.scheme != 'bip44' and self.scheme != 'multisig':
raise WalletError("The wallet scan() method is only available for BIP44 wallets")
if change != 1:
scanned_keys = self.get_key(account_id, network, number_of_keys=scan_gap_limit)
new_key_ids = [k.key_id for k in scanned_keys]
nr_new_txs = 0
new_key_ids = list(set(new_key_ids) - set(_keys_ignore))
n_highest_updated = 0
for new_key_id in new_key_ids:
n_new = self.transactions_update(change=0, key_id=new_key_id)
if n_new:
n_highest_updated = new_key_id if n_new and n_highest_updated < new_key_id else n_highest_updated
nr_new_txs += n_new
for key_id in [key_id for key_id in new_key_ids if key_id < n_highest_updated]: