Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ssh_username=SSH_USERNAME,
ssh_password=SSH_PASSWORD,
remote_bind_address=(self.eaddr, self.eport),
local_bind_address=('', self.randomize_eport()),
logger=self.log
) as server:
keys = server.get_keys()
self.assertIsInstance(keys, list)
self.assertFalse(any('keys loaded from agent' in l for l in
self.sshtunnel_log_messages['info']))
tmp_dir = tempfile.mkdtemp()
shutil.copy(get_test_data_path(PKEY_FILE),
os.path.join(tmp_dir, 'id_rsa'))
keys = sshtunnel.SSHTunnelForwarder.get_keys(
self.log,
host_pkey_directories=[tmp_dir, ]
)
self.assertIsInstance(keys, list)
self.assertTrue(
any('1 keys loaded from host directory' in l
for l in self.sshtunnel_log_messages['info'])
)
shutil.rmtree(tmp_dir)
def _consolidate_auth(ssh_password=None,
ssh_pkey=None,
ssh_pkey_password=None,
allow_agent=True,
host_pkey_directories=None,
logger=None):
"""
Get sure authentication information is in place.
``ssh_pkey`` may be of classes:
- ``str`` - in this case it represents a private key file; public
key will be obtained from it
- ``paramiko.Pkey`` - it will be transparently added to loaded keys
"""
ssh_loaded_pkeys = SSHTunnelForwarder.get_keys(
logger=logger,
host_pkey_directories=host_pkey_directories,
allow_agent=allow_agent
)
if isinstance(ssh_pkey, string_types):
ssh_pkey_expanded = os.path.expanduser(ssh_pkey)
if os.path.exists(ssh_pkey_expanded):
ssh_pkey = SSHTunnelForwarder.read_private_key_file(
pkey_file=ssh_pkey_expanded,
pkey_password=ssh_pkey_password or ssh_password,
logger=logger
)
elif logger:
logger.warning('Private key file not found: {0}'
.format(ssh_pkey))