Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with mock.patch('neo.Prompt.Utils.get_input_prompt', return_value="b'abc'") as fake_prompt:
result, abort = Utils.gather_param(0, ContractParameterType.Array, do_continue=False)
self.assertRaises(Exception, "Please provide a list")
self.assertEqual(result, None)
self.assertEqual(abort, True)
# test ContractParameterType.PublicKey
with mock.patch('neo.Prompt.Utils.get_input_prompt', return_value="03cbb45da6072c14761c9da545749d9cfd863f860c351066d16df480602a2024c6") as fake_prompt:
test_wallet_path = shutil.copyfile(
WalletFixtureTestCase.wallet_1_path(),
WalletFixtureTestCase.wallet_1_dest()
)
wallet = UserWallet.Open(
test_wallet_path,
to_aes_key(WalletFixtureTestCase.wallet_1_pass())
)
addr_scripthash = wallet.GetStandardAddress()
result, abort = Utils.gather_param(0, ContractParameterType.PublicKey)
script = b'21' + result.encode_point(True) + b'ac'
pk_script_hash = Crypto.ToScriptHash(script)
self.assertEqual(addr_scripthash, pk_script_hash) # verifies the functionality of ContractParameterType.PublicKey
wallet.Close()
wallet = None
os.remove(WalletFixtureTestCase.wallet_1_dest())
# test unknown ContractParameterType
def test_send_to_address_bad_address(self):
test_wallet_path = shutil.copyfile(
WalletFixtureTestCase.wallet_1_path(),
WalletFixtureTestCase.wallet_1_dest()
)
self.app.wallet = UserWallet.Open(
test_wallet_path,
to_aes_key(WalletFixtureTestCase.wallet_1_pass())
)
address = 'AXjaFSP23Jkbe6Pk9pPGT6NBDs1HVdqaX' # "AXjaFSP23Jkbe6Pk9pPGT6NBDs1HVdqaX" is too short causing ToScriptHash to fail
req = self._gen_post_rpc_req("sendtoaddress", params=['gas', address, 1])
mock_req = mock_post_request(json.dumps(req).encode("utf-8"))
res = json.loads(self.app.home(mock_req))
error = res.get('error', {})
self.assertEqual(error.get('code', None), -32602)
self.assertEqual(error.get('message', None), "Invalid params")
self.app.wallet.Close()
self.app.wallet = None
os.remove(WalletFixtureTestCase.wallet_1_dest())
def test_wallet_height(self):
wallet = UserWallet("fakepath", to_aes_key("123"), True)
wallet.SaveStoredData('Height', 1234)
wallet._current_height = wallet.LoadStoredData('Height')
self.assertEqual(wallet._current_height, 1234)
self.assertEqual(wallet.WalletHeight, 1234)
def test_gettxhistory(self):
test_wallet_path = shutil.copyfile(
WalletFixtureTestCase.wallet_1_path(),
WalletFixtureTestCase.wallet_1_dest()
)
self.app.wallet = UserWallet.Open(
test_wallet_path,
to_aes_key(WalletFixtureTestCase.wallet_1_pass())
)
req = self._gen_post_rpc_req("gettxhistory")
mock_req = mock_post_request(json.dumps(req).encode("utf-8"))
res = json.loads(self.app.home(mock_req))
for tx in res['result']:
self.assertIn('txid', tx.keys())
self.assertIsNotNone(tx['txid'])
self.assertIn('block_index', tx.keys())
self.assertIsNotNone(tx['block_index'])
self.assertIn('blocktime', tx.keys())
self.assertIsNotNone(tx['blocktime'])
self.assertEqual(len(res['result']), 9)
self.app.wallet.Close()
self.app.wallet = None
os.remove(WalletFixtureTestCase.wallet_1_dest())
def test_sendmany_bad_address(self):
test_wallet_path = shutil.copyfile(
WalletFixtureTestCase.wallet_1_path(),
WalletFixtureTestCase.wallet_1_dest()
)
self.app.wallet = UserWallet.Open(
test_wallet_path,
to_aes_key(WalletFixtureTestCase.wallet_1_pass())
)
address_to = 'AXjaFSP23Jkbe6Pk9pPGT6NBDs1HVdqaX' # "AXjaFSP23Jkbe6Pk9pPGT6NBDs1HVdqaX" is too short causing ToScriptHash to fail
output = [{"asset": 'neo',
"value": 1,
"address": address_to},
{"asset": 'neo',
"value": 1,
"address": address_to}]
req = self._gen_post_rpc_req("sendmany", params=[output])
mock_req = mock_post_request(json.dumps(req).encode("utf-8"))
res = json.loads(self.app.home(mock_req))
error = res.get('error', {})
self.assertEqual(error.get('code', None), -32602)
self.assertEqual(error.get('message', None), "Invalid params")
self.app.wallet.Close()
self.app.wallet = None
def GetWallet1(cls, recreate=False):
if cls._wallet1 is None or recreate:
cls._wallet1 = UserWallet.Create(UserWalletTestCase.new_wallet_dest(),
to_aes_key('awesomepassword'))
return cls._wallet1
def claim_initial_neo(self, target_address):
'''
claim all initial neo to a target_address
'''
wallets = []
tx_json = None
dbloops = []
print("Signing new transaction with 1 of 1 node keys...")
walletpath = "wallet1.db3"
if os.path.exists(walletpath):
os.remove(walletpath)
wallet = UserWallet.Create(path=walletpath, password=to_aes_key(self.wallet_pwd))
wallets.append(wallet)
print("Importing node private key to to {}".format(walletpath))
prikey = KeyPair.PrivateKeyFromWIF(wif)
wallet.CreateKey(prikey)
keys = list(nodekeys.keys())
print("Importing multi-sig contract to {}".format(walletpath))
pubkey_script_hash = Crypto.ToScriptHash(pubkey, unhex=True)
verification_contract = Contract.CreateMultiSigContract(pubkey_script_hash, 1, keys)
wallet.AddContract(verification_contract)
print("Added multi-sig contract address %s to wallet" % verification_contract.Address)
dbloop = task.LoopingCall(wallet.ProcessBlocks)
dbloop.start(1)
dbloops.append(dbloop)
path = get_arg(arguments, 1)
if path:
if os.path.exists(path):
print("File already exists")
return
passwd1 = prompt("[password]> ", is_password=True)
passwd2 = prompt("[password again]> ", is_password=True)
if passwd1 != passwd2 or len(passwd1) < 10:
print("Please provide matching passwords that are at least 10 characters long")
return
password_key = to_aes_key(passwd1)
try:
self.Wallet = UserWallet.Create(path=path,
password=password_key)
contract = self.Wallet.GetDefaultContract()
key = self.Wallet.GetKey(contract.PublicKeyHash)
print("Wallet %s" % json.dumps(self.Wallet.ToJson(), indent=4))
print("Pubkey %s" % key.PublicKey.encode_point(True))
except Exception as e:
print("Exception creating wallet: %s" % e)
self.Wallet = None
if os.path.isfile(path):
try:
os.remove(path)
except Exception as e:
print("Could not remove {}: {}".format(path, e))
if LoadStoredData('MigrationState') == '1':
print("This wallet was already secured")
return False
else:
print("The wallet is vulnerable, will proceed with the operation.")
# Decrypt Master Key - Without using a password
master_enc = LoadStoredData('MasterKey')
passwordHash = LoadStoredData('PasswordHash')
iv = LoadStoredData('IV')
aes_dec = AES.new(passwordHash, AES.MODE_CBC, iv)
master_key = aes_dec.decrypt(master_enc)
# Encrypt again with the new password
new_key = to_aes_key(new_password)
new_hash = hashlib.sha256(new_key).digest()
aes_enc = AES.new(new_key, AES.MODE_CBC, iv)
mk = aes_enc.encrypt(master_key)
SaveStoredData('PasswordHash', new_hash)
SaveStoredData('MasterKey', mk)
SaveStoredData('MigrationState', '1')
db.close()
return True
def setup_wallet(self, wallet_path):
if not os.path.exists(wallet_path):
raise ValueError("Wallet file not found")
self.wallet_path = wallet_path
self.syncd_wallet_path = wallet_path + ".syncd"
if not os.path.exists(self.syncd_wallet_path):
self.logger.info("Creating syncd copy of wallet file...")
copyfile(self.wallet_path, self.syncd_wallet_path)
wallet_passwd = prompt("[password]> ", is_password=True)
self.wallet_passwd_key = to_aes_key(wallet_passwd)
self.setup_network()