Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _stop_transport(self):
""" Close the underlying transport when nothing more is needed """
try:
self._check_is_started()
except (BaseSSHTunnelForwarderError,
HandlerSSHTunnelForwarderError) as e:
self.logger.warning(e)
for _srv in self._server_list:
tunnel = _srv.local_address
if self.tunnel_is_up[tunnel]:
self.logger.info('Shutting down tunnel {0}'.format(tunnel))
_srv.shutdown()
_srv.server_close()
# clean up the UNIX domain socket if we're using one
if isinstance(_srv, _UnixStreamForwardServer):
try:
os.unlink(_srv.local_address)
except Exception as e:
self.logger.error('Unable to unlink socket {0}: {1}'
.format(self.local_address, repr(e)))
self.is_alive = False
if self.is_active:
self._transport.close()
self._transport.stop_thread()
self.logger.debug('Transport is closed')
def _make_unix_ssh_forward_server_class(self, remote_address_):
return _ThreadingUnixStreamForwardServer if \
self._threaded else _UnixStreamForwardServer
@property
def remote_address(self):
return self.RequestHandlerClass.remote_address
@property
def remote_host(self):
return self.RequestHandlerClass.remote_address[0]
@property
def remote_port(self):
return self.RequestHandlerClass.remote_address[1]
class _ThreadingUnixStreamForwardServer(socketserver.ThreadingMixIn,
_UnixStreamForwardServer):
"""
Allow concurrent connections to each tunnel
"""
# If True, cleanly stop threads created by ThreadingMixIn when quitting
daemon_threads = _DAEMON
class SSHTunnelForwarder(object):
"""
**SSH tunnel class**
- Initialize a SSH tunnel to a remote host according to the input
arguments
- Optionally:
+ Read an SSH configuration file (typically ``~/.ssh/config``)