Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_runner_doesnt_impact_unrised_observer_exception_while_taking_observer_result(connection_observer,
observer_runner):
from moler.runner import time_out_observer, result_for_runners
from moler.exceptions import ConnectionObserverTimeout
with observer_runner:
connection_observer.life_status.start_time = time.time() # must start observer lifetime before runner.submit()
observer_runner.submit(connection_observer)
time_out_observer(connection_observer, timeout=2.3, passed_time=2.32, runner_logger=mock.MagicMock())
timeout = connection_observer._exception
assert timeout in ConnectionObserver._not_raised_exceptions
try:
result_for_runners(connection_observer)
except ConnectionObserverTimeout as timeout:
assert timeout in ConnectionObserver._not_raised_exceptions
def test_runner_doesnt_impact_unrised_observer_exception_while_taking_observer_result(connection_observer,
observer_runner):
from moler.runner import time_out_observer, result_for_runners
from moler.exceptions import ConnectionObserverTimeout
with observer_runner:
connection_observer.life_status.start_time = time.time() # must start observer lifetime before runner.submit()
observer_runner.submit(connection_observer)
time_out_observer(connection_observer, timeout=2.3, passed_time=2.32, runner_logger=mock.MagicMock())
timeout = connection_observer._exception
assert timeout in ConnectionObserver._not_raised_exceptions
try:
result_for_runners(connection_observer)
except ConnectionObserverTimeout as timeout:
assert timeout in ConnectionObserver._not_raised_exceptions
def test_su_catches_authentication_failure(buffer_connection, command_output_and_expected_result_auth):
from moler.exceptions import CommandFailure
command_output, expected_result = command_output_and_expected_result_auth
buffer_connection.remote_inject_response([command_output])
su_cmd = Su(connection=buffer_connection.moler_connection, prompt=r"xyz@debian:", expected_prompt=r"root@debian")
with pytest.raises(CommandFailure):
su_cmd()
def test_run_script_raise_exception(buffer_connection, command_output_and_expected_result):
command_output, expected_result = command_output_and_expected_result
buffer_connection.remote_inject_response([command_output])
cmd = RunScript(connection=buffer_connection.moler_connection, script_command="./myScript.sh")
with pytest.raises(CommandFailure):
cmd()
def test_socat_raise_connection_refused(buffer_connection, command_output_and_expected_result_on_connection_refused):
socat_cmd = Socat(connection=buffer_connection.moler_connection, input_options='STDIO',
output_options='tcp:localhost:3334', options='-d')
command_output, expected_result = command_output_and_expected_result_on_connection_refused
buffer_connection.remote_inject_response([command_output])
assert 'socat -d STDIO tcp:localhost:3334' == socat_cmd.command_string
with pytest.raises(CommandFailure):
socat_cmd()
def test_can_open_and_close_connection(tcp_connection_class,
integration_tcp_server_and_pipe):
"""
Not so atomic test (checks 2 things) but:
- it is integration tests
- anyway open needs close as cleanup to not have resources leaking in tests
"""
from moler.threaded_moler_connection import ThreadedMolerConnection
(tcp_server, tcp_server_pipe) = integration_tcp_server_and_pipe
moler_conn = ThreadedMolerConnection()
connection = tcp_connection_class(moler_connection=moler_conn, port=tcp_server.port, host=tcp_server.host)
connection.open()
connection.close()
time.sleep(0.1) # otherwise we have race between server's pipe and from-client-connection
tcp_server_pipe.send(("get history", {}))
dialog_with_server = tcp_server_pipe.recv()
assert 'Client connected' in dialog_with_server
assert 'Client disconnected' in dialog_with_server
def connection_observer(observer_runner):
from moler.threaded_moler_connection import ThreadedMolerConnection
moler_conn = ThreadedMolerConnection()
observer = NetworkDownDetector(connection=moler_conn, runner=observer_runner)
yield observer
# remove exceptions collected inside ConnectionObserver
ConnectionObserver.get_unraised_exceptions(remove=True)
def test_can_open_and_close_connection_as_context_manager(tcp_connection_class,
integration_tcp_server_and_pipe):
from moler.threaded_moler_connection import ThreadedMolerConnection
(tcp_server, tcp_server_pipe) = integration_tcp_server_and_pipe
moler_conn = ThreadedMolerConnection()
connection = tcp_connection_class(moler_connection=moler_conn, port=tcp_server.port, host=tcp_server.host)
with connection.open():
pass
dialog_with_server = _wait_for_last_message(tcp_server_pipe=tcp_server_pipe, last_message='Client disconnected',
timeout=5)
assert 'Client connected' in dialog_with_server
assert 'Client disconnected' in dialog_with_server
def test_can_send_binary_data_over_connection(tcp_connection_class,
integration_tcp_server_and_pipe):
from moler.threaded_moler_connection import ThreadedMolerConnection
(tcp_server, tcp_server_pipe) = integration_tcp_server_and_pipe
moler_conn = ThreadedMolerConnection() # no decoder, just pass bytes 1:1
connection = tcp_connection_class(moler_connection=moler_conn, port=tcp_server.port, host=tcp_server.host)
with connection.open():
moler_conn.send(data=b'data to be send') # TODO: await moler_conn.send(data=b'data to be send') ???
time.sleep(0.1) # otherwise we have race between server's pipe and from-client-connection
tcp_server_pipe.send(("get history", {}))
dialog_with_server = tcp_server_pipe.recv()
assert ['Received data:', b'data to be send'] == dialog_with_server[-1]
def failing_net_down_detector(fail_on_data, fail_by_raising, runner):
from moler.threaded_moler_connection import ThreadedMolerConnection
class FailingNetworkDownDetector(NetworkDownDetector):
def data_received(self, data, recv_time):
if data == fail_on_data:
raise fail_by_raising
return super(FailingNetworkDownDetector, self).data_received(data, recv_time)
moler_conn = ThreadedMolerConnection()
failing_detector = FailingNetworkDownDetector(connection=moler_conn, runner=runner)
yield failing_detector
# remove exceptions collected inside ConnectionObserver
ConnectionObserver.get_unraised_exceptions(remove=True)