Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
unix.establish_connection()
unix.goto_state(UnixLocal.unix_local)
rets = {'ping': None, 'whoami': None}
def callback_response():
cmd_whoami = unix.get_cmd(cmd_name="whoami")
rets['whoami'] = cmd_whoami()
event_reconnect = unix.get_event(event_name="ping_response", event_params={})
event_reconnect.add_event_occurred_callback(
callback=callback_response,
)
event_reconnect.start()
cmd_ping = unix.get_cmd(cmd_name="ping", cmd_params={'destination': '127.0.0.1', 'options': '-c 1'})
rets['ping'] = cmd_ping(timeout=5)
MolerTest.sleep(1)
assert rets['ping'] is not None
assert rets['whoami'] is not None
def test_can_send_data_into_ext_io_from_moler_connection(memory_connection_without_decoder):
connection = memory_connection_without_decoder
moler_conn = connection.moler_connection
received_data = bytearray()
def receiver(data, time_recv):
received_data.extend(data)
moler_conn.subscribe(receiver, connection_closed_handler)
with connection.open():
moler_conn.send("command to be echoed")
connection.read()
MolerTest.sleep(1)
assert b'command to be echoed' == received_data
def callback_long(param_dict):
param_dict['number'] += 1
MolerTest.sleep(seconds=0.12)
def feed_in_separate_thread():
while run:
buffer_connection.moler_connection.data_received("abcde\nfghi\njkl".encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time/10)
from threading import Thread
tf = Thread(target=feed_in_separate_thread)
tf.setDaemon(True)
tf.start()
start_time = time.time()
while time.time() - start_time < 4 or processed['process'] < 300:
event.pause()
MolerTest.sleep(sleep_time)
event.resume()
MolerTest.sleep(sleep_time)
event.resume()
run = False
MolerTest.sleep(0.2)
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
event.await_done(timeout=1)
assert event.done() is True
buffer_connection.moler_connection.data_received("abcde\nfghi\njkl".encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time/10)
from threading import Thread
tf = Thread(target=feed_in_separate_thread)
tf.setDaemon(True)
tf.start()
start_time = time.time()
while time.time() - start_time < 4 or processed['process'] < 300:
event.pause()
MolerTest.sleep(sleep_time)
event.resume()
MolerTest.sleep(sleep_time)
event.resume()
run = False
MolerTest.sleep(0.2)
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
event.await_done(timeout=1)
assert event.done() is True
def test_inject_response_awaits_nearest_write_before_responding(memory_connection_without_decoder):
connection = memory_connection_without_decoder
moler_conn = connection.moler_connection
received_data = bytearray()
def receiver(data, time_recv):
received_data.extend(data)
moler_conn.subscribe(receiver, connection_closed_handler)
with connection.open():
connection.inject_response(input_bytes=[b'response\n'])
connection.read()
MolerTest.sleep(1)
assert b'' == received_data # injection not active yet
connection.write(b'request\n')
connection.read()
MolerTest.sleep(1)
assert b'request\nresponse\n' == received_data
def on_new_line(self, line, is_full_line):
if self.raise_unicode:
self.nr += 1
exc = UnicodeDecodeError("utf-8", b'abcdef', 0, 1, "Unknown")
raise exc
super(CdUnicodeError, self).on_new_line(line, is_full_line)
cmd = CdUnicodeError(connection=buffer_connection.moler_connection, path="/home/user/")
cmd_start_string = "{}\n".format(cmd.command_string)
cmd.start()
MolerTest.sleep(sleep_time)
buffer_connection.moler_connection.data_received(cmd_start_string.encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time)
cmd._ignore_unicode_errors = False
cmd.raise_unicode = True
MolerTest.sleep(sleep_time)
buffer_connection.moler_connection.data_received("abc".encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time)
cmd.raise_unicode = False
MolerTest.sleep(sleep_time)
buffer_connection.moler_connection.data_received(command_output.encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time)
with pytest.raises(CommandFailure):
cmd.await_done()
cmd = CdUnicodeError(connection=buffer_connection.moler_connection, path="/home/user/")
cmd.start()
MolerTest.sleep(sleep_time)
buffer_connection.moler_connection.data_received(cmd_start_string.encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time)
cmd._ignore_unicode_errors = True
cmd.raise_unicode = True
def test_job():
values = {'number': 0}
job = Scheduler.get_job(callback=callback, interval=0.1, callback_params={'param_dict': values})
job.start()
MolerTest.sleep(seconds=0.22)
job.cancel()
assert(2 == values['number'])
max_timeout = 5.0
def callback_fun(param):
param['nr'] += 1
output = "From 192.168.255.126 icmp_seq=1 Destination Host Unreachable"
event = PingNoResponse(connection=buffer_connection.moler_connection, till_occurs_times=2)
event.add_event_occurred_callback(callback=callback_fun, callback_params={'param': counter})
assert 0 == counter['nr']
event.start()
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
start_time = time.time()
while time.time() - start_time <= max_timeout:
if 1 == counter['nr']:
break
MolerTest.sleep(sleep_time)
assert 1 == counter['nr']
event.pause()
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
MolerTest.sleep(sleep_time)
assert 1 == counter['nr']
event.resume()
buffer_connection.moler_connection.data_received(output.encode("utf-8"), datetime.datetime.now())
event.await_done()
start_time = time.time()
while time.time() - start_time <= max_timeout:
if 2 == counter['nr']:
break
MolerTest.sleep(sleep_time)
assert 2 == counter['nr']
assert event.done() is True