Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exception_handling(self):
def test_exception_handling(to_raise, to_catch):
with self.assertRaises(to_catch):
with self.assertLogs(level=logging.WARNING):
with self.moab_adapter.handle_exceptions():
raise to_raise
matrix = [
(asyncio.TimeoutError(), TardisTimeout),
(
asyncssh.Error(code=255, reason="Test", lang="Test"),
TardisResourceStatusUpdateFailed,
),
(IndexError, TardisResourceStatusUpdateFailed),
(TardisResourceStatusUpdateFailed, TardisResourceStatusUpdateFailed),
(
CommandExecutionFailure(
message="Run test command",
exit_code=1,
stdout="Test",
stderr="Test",
),
TardisResourceStatusUpdateFailed,
),
(Exception, TardisError),
]
def __init__(self, host='localhost', port=8022, client_keys=None, known_hosts=None, max_packet_size=32768):
self.max_packet_size = max_packet_size
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._session = asyncio.get_event_loop().run_until_complete(self.__connect(host, port, known_hosts, client_keys))
try:
t = Thread(target=self.__start_loop, args=(self._loop,))
t.start()
# asyncio.run_coroutine_threadsafe(self.__connect(), self._loop)
except (OSError, asyncssh.Error) as exc:
sys.exit(f'SSH connection failed: {exc}')
def handle_client(process):
process.stdout.write('Welcome to my SSH server, %s!\n' %
process.get_extra_info('username'))
process.exit(0)
async def start_server():
await asyncssh.listen('', 8022, server_host_keys=['ssh_host_key'],
authorized_client_keys='ssh_user_ca',
process_factory=handle_client)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
loop.run_forever()
print(data, end='', file=sys.stderr)
else:
print(data, end='')
def connection_lost(self, exc):
if exc:
print('SSH session error: ' + str(exc), file=sys.stderr)
async def run_client():
async with asyncssh.connect('localhost') as conn:
chan, session = await conn.create_session(MySSHClientSession, 'ls abc')
await chan.wait_closed()
try:
asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
sys.exit('SSH connection failed: ' + str(exc))
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
import asyncio, asyncssh, sys
async def run_client():
async with asyncssh.connect('localhost') as conn:
result = await conn.run('echo $TERM; stty size',
term_type='xterm-color',
term_size=(80, 24))
print(result.stdout, end='')
try:
asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
sys.exit('SSH connection failed: ' + str(exc))
print('Direct connection error:', str(exc), file=sys.stderr)
async def run_client():
async with asyncssh.connect('localhost') as conn:
chan, session = await conn.create_connection(MySSHTCPSession,
'www.google.com', 80)
# By default, TCP connections send and receive bytes
chan.write(b'HEAD / HTTP/1.0\r\n\r\n')
chan.write_eof()
await chan.wait_closed()
try:
asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
sys.exit('SSH connection failed: ' + str(exc))
except asyncssh.BreakReceived:
pass
process.stdout.write('Total = %s\n' % total)
process.exit(0)
async def start_server():
await asyncssh.listen('', 8022, server_host_keys=['ssh_host_key'],
authorized_client_keys='ssh_user_ca',
process_factory=handle_client)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
loop.run_forever()
#
# The file ``ssh_user_ca`` must exist with a cert-authority entry of
# the certificate authority which can sign valid client certificates.
import asyncio, asyncssh, sys
async def start_server():
await asyncssh.listen('', 8022, server_host_keys=['ssh_host_key'],
authorized_client_keys='ssh_user_ca',
sftp_factory=True, allow_scp=True)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
loop.run_forever()
return handle_connection
else:
raise asyncssh.ChannelOpenError(
asyncssh.OPEN_ADMINISTRATIVELY_PROHIBITED,
'Only echo connections allowed')
async def start_server():
await asyncssh.create_server(MySSHServer, '', 8022,
server_host_keys=['ssh_host_key'],
authorized_client_keys='ssh_user_ca')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('SSH server failed: ' + str(exc))
loop.run_forever()
if e.code == '1':
verrors.add(
f'{schema}.remotepath',
'The Remote Path you specified does not exist or is not a directory.'
'Either create one yourself on the remote machine or uncheck the '
'validate_rpath field'
)
else:
verrors.add(
f'{schema}.remotepath',
f'Connection to Remote Host was successful but failed to verify '
f'Remote Path. {e.__str__()}'
)
except asyncssh.Error as e:
if e.__class__.__name__ in e.__str__():
exception_reason = e.__str__()
else:
exception_reason = e.__class__.__name__ + ' ' + e.__str__()
verrors.add(
f'{schema}.remotepath',
f'Remote Path could not be validated. An exception was raised. {exception_reason}'
)
elif data['enabled'] and data['validate_rpath']:
verrors.add(
f'{schema}.remotepath',
'Remote path could not be validated because of missing fields'
)
data.pop('validate_rpath', None)