Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _check_server_auth(self):
# Check if authentication to server was successfulZ
self.ssh_event.wait(sshtunnel.SSH_TIMEOUT) # wait for transport
self.assertTrue(self.ssh_event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual(self.ts.get_username(),
SSH_USERNAME)
self.assertTrue(self.ts.is_authenticated())
def _do_forwarding(self, timeout=sshtunnel.SSH_TIMEOUT):
self.log.debug('forward-server Start')
self.ssh_event.wait(THREADS_TIMEOUT) # wait for SSH server's transport
try:
schan = self.ts.accept(timeout=timeout)
info = "forward-server schan <> echo"
self.log.info(info + " accept()")
echo = socket.create_connection(
(self.eaddr, self.eport)
)
while self.is_server_working:
rqst, _, _ = select.select([schan, echo],
[],
[],
timeout)
if schan in rqst:
data = schan.recv(1024)
ECDSA = b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60'
FINGERPRINTS = {
'ssh-dss': SSH_DSS,
'ssh-rsa': SSH_RSA,
'ecdsa-sha2-nistp256': ECDSA,
}
DAEMON_THREADS = False
HERE = path.abspath(path.dirname(__file__))
THREADS_TIMEOUT = 5.0
PKEY_FILE = 'testrsa.key'
ENCRYPTED_PKEY_FILE = 'testrsa_encrypted.key'
TEST_CONFIG_FILE = 'testconfig'
TEST_UNIX_SOCKET = get_test_data_path('test_socket')
sshtunnel.TRACE = True
sshtunnel.SSH_TIMEOUT = 1.0
# TESTS
class MockLoggingHandler(logging.Handler, object):
"""Mock logging handler to check for expected logs.
Messages are available from an instance's `messages` dict, in order,
indexed by a lowercase log level string (e.g., 'debug', 'info', etc.).
"""
def __init__(self, *args, **kwargs):
self.messages = {'debug': [], 'info': [], 'warning': [], 'error': [],
'critical': [], 'trace': []}
super(MockLoggingHandler, self).__init__(*args, **kwargs)