Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def device_factory():
from moler.device.device import DeviceFactory as dev_factory
# restore since tests may change configuration
clear_all_cfg()
yield dev_factory
# restore since tests may change configuration
clear_all_cfg()
def clear_all_cfg():
import moler.config as moler_cfg
import moler.config.devices as dev_cfg
from moler.device.device import DeviceFactory as dev_factory
moler_cfg.clear()
dev_cfg.clear()
dev_factory._clear()
import os
from moler.config import load_config
from moler.device.device import DeviceFactory
load_config(config=os.path.join(os.path.dirname(__file__), 'my_devices.yml'))
my_unix = DeviceFactory.get_device(name='MyMachine')
host = 'www.google.com'
ping_cmd = my_unix.get_cmd(cmd_name="ping", cmd_params={"destination": host, "options": "-w 6"})
remote_unix = DeviceFactory.get_device(name='RebexTestMachine')
remote_unix.goto_state(state="UNIX_REMOTE")
ls_cmd = remote_unix.get_cmd(cmd_name="ls", cmd_params={"options": "-l"})
print("Start pinging {} ...".format(host))
ping_cmd.start() # run command in background
print("Let's check readme.txt at {} while pinging {} ...".format(remote_unix.name, host))
remote_files = ls_cmd() # foreground "run in the meantime"
file_info = remote_files['files']['readme.txt']
print("readme.txt file: owner={fi[owner]}, size={fi[size_bytes]}".format(fi=file_info))
ping_stats = ping_cmd.await_done(timeout=6) # await background command
def test_network_outage():
load_config(config=os.path.abspath('config/my_devices.yml'))
unix1 = DeviceFactory.get_device(name='MyMachine1')
unix2 = DeviceFactory.get_device(name='MyMachine2')
# test setup - ensure network is up before running test
ifconfig_up = unix2.get_cmd(cmd_name="ifconfig", cmd_params={"options": "lo up"})
sudo_ifconfig_up = unix2.get_cmd(cmd_name="sudo", cmd_params={"password": "moler", "cmd_object": ifconfig_up})
sudo_ifconfig_up()
# run test
ping = unix1.get_cmd(cmd_name="ping", cmd_params={"destination": "localhost", "options": "-O"})
ping.start(timeout=120)
time.sleep(3)
ifconfig_down = unix2.get_cmd(cmd_name="ifconfig", cmd_params={"options": "lo down"})
sudo_ifconfig_down = unix2.get_cmd(cmd_name="sudo", cmd_params={"password": "moler", "cmd_object": ifconfig_down})
sudo_ifconfig_down()
def _is_device_creation_needed(name, requested_device_def):
"""
:param name: Name of device
:param requested_device_def: Definition of device requested to create/
:return: True if device doesn't exist. False if device already exists.
:
"""
from moler.device.device import DeviceFactory
try:
DeviceFactory.get_device(name, establish_connection=False)
msg = DeviceFactory.differences_between_devices_descriptions(name, requested_device_def)
if msg:
raise WrongUsage(msg)
return False # Device exists and have the same construct parameters
except KeyError:
return True
import os
from moler.config import load_config
from moler.device.device import DeviceFactory
load_config(config=os.path.join(os.path.dirname(__file__), 'my_devices.yml')) # description of available devices
# load_config(config={'DEVICES': {'MyMachine': {'DEVICE_CLASS': 'moler.device.unixremote.UnixLocal'}}},
# config_type='dict')
my_unix = DeviceFactory.get_device(name='MyMachine') # take specific device out of available ones
ps_cmd = my_unix.get_cmd(cmd_name="ps", # take command of that device
cmd_params={"options": "-ef"})
processes_info = ps_cmd() # run the command, it returns result
for proc_info in processes_info:
if 'python' in proc_info['CMD']:
print("PID: {info[PID]} CMD: {info[CMD]}".format(info=proc_info))
"""
PID: 1817 CMD: /usr/bin/python /usr/share/system-config-printer/applet.py
def test_network_outage():
load_config(config=os.path.abspath('config/my_devices.yml'))
unix1 = DeviceFactory.get_device(name='MyMachine1')
unix2 = DeviceFactory.get_device(name='MyMachine2')
ping = unix1.get_cmd(cmd_name="ping", cmd_params={"destination": "localhost", "options": "-O"})
ping.start(timeout=120)
time.sleep(3)
ifconfig_down = unix2.get_cmd(cmd_name="ifconfig", cmd_params={"options": "lo down"})
sudo_ifconfig_down = unix2.get_cmd(cmd_name="sudo", cmd_params={"password": "moler", "cmd_object": ifconfig_down})
sudo_ifconfig_down()
time.sleep(5)
ifconfig_up = unix2.get_cmd(cmd_name="ifconfig", cmd_params={"options": "lo up"})
sudo_ifconfig_up = unix2.get_cmd(cmd_name="sudo", cmd_params={"password": "moler", "cmd_object": ifconfig_up})
sudo_ifconfig_up()
time.sleep(3)
# TODO: problem for {'CONNECTION_DESC': {'io_type': 'terminal', 'variant': 'asyncio'}},
# TODO: get_device() uses io.open() and not await open()
# async def do_async_device_ls():
# remote_unix = await DeviceFactory.get_device_coro(name='RebexTestMachine')
# remote_unix = await AsyncDeviceFactory.get_device(name='RebexTestMachine')
# # TODO: + textualdevice should have separate __init__() and async def open()
# await remote_unix.goto_state_coro(state="UNIX_REMOTE")
# ls_cmd = remote_unix.get_cmd(cmd_name="ls", cmd_params={"options": "-l"})
# remote_files = await ls_cmd
#
# run_via_asyncio(do_async_device_ls())
remote_unix = DeviceFactory.get_device(name='RebexTestMachine') # it starts in local shell
remote_unix.goto_state(state="UNIX_REMOTE") # make it go to remote shell
ls_cmd = remote_unix.get_cmd(cmd_name="ls", cmd_params={"options": "-l"})
remote_files = ls_cmd()
if 'readme.txt' in remote_files['files']:
print("readme.txt file:")
readme_file_info = remote_files['files']['readme.txt']
for attr in readme_file_info:
print(" {:<18}: {}".format(attr, readme_file_info[attr]))
# result:
"""
readme.txt file:
def differences_between_devices_descriptions(cls, already_device_name, requested_device_def):
"""
Checks if two device description are the same.
:param already_device_name: Name of device already created by Moler
:param requested_device_def: Description od device provided to create. The name is the same as above.
:return: Empty string if descriptions are the same, if not the string with differences.
"""
already_created_device = cls.get_device(already_device_name, establish_connection=False)
already_device_def = copy_dict(DeviceFactory._devices_params[already_created_device.name], True)
different_msg = ""
already_full_class = already_device_def['class_fullname']
current_full_class = requested_device_def['DEVICE_CLASS']
if already_full_class == current_full_class:
default_hops = dict()
already_hops = already_device_def['constructor_parameters']['sm_params'].get('CONNECTION_HOPS',
default_hops)
current_hops = requested_device_def.get('CONNECTION_HOPS', default_hops)
diff = compare_objects(already_hops, current_hops)
if diff:
different_msg = "Device '{}' already created with SM parameters: '{}' but now requested with SM" \
" params: {}. \nDiff: {}".format(already_device_name, already_hops, current_hops, diff)
else:
different_msg = "Device '{}' already created as instance of class '{}' and now requested as instance of " \
"class '{}'".format(already_device_name, already_full_class, current_full_class)
def test_network_outage():
load_config(config=os.path.abspath('config/my_devices.yml'))
unix1 = DeviceFactory.get_device(name='MyMachine1')
unix2 = DeviceFactory.get_device(name='MyMachine2')
#######################################################
# TEST GOAL: network outage should not exceed 3 seconds
#######################################################
# test setup - prepare everything required by test
ping_times = {"lost_connection_time": 0,
"reconnection_time": 0}
# ensure network is up before running test
net_up = unix2.get_cmd(cmd_name="ifconfig", cmd_params={"options": "lo up"})
ensure_interfaces_up = unix2.get_cmd(cmd_name="sudo", cmd_params={"password": "moler", "cmd_object": net_up})
# run event observing "network down/up"
ping_lost_detector = unix1.get_event(event_name="ping_no_response", event_params={"till_occurs_times": 1})
ping_lost_detector.add_event_occurred_callback(callback=outage_callback,
callback_params={'device_name': 'MyMachine1',
'ping_times': ping_times})