Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_factory_has_buildin_constructors_active_by_default():
from moler.connection_factory import get_connection
conn = get_connection(io_type='memory', variant='threaded')
assert conn.__module__ == 'moler.io.raw.memory'
assert conn.__class__.__name__ == 'ThreadedFifoBuffer'
conn = get_connection(io_type='tcp', variant='threaded', host='localhost', port=2345)
assert conn.__module__ == 'moler.io.raw.tcp'
assert conn.__class__.__name__ == 'ThreadedTcp'
def test_connection_factory_can_use_alternate_login_param_of_sshshell():
from moler.connection_factory import get_connection
conn = get_connection(io_type='sshshell',
host='localhost', port=2222, login="moler", password="moler_passwd")
assert conn.__module__ == 'moler.io.raw.sshshell'
assert conn.__class__.__name__ == 'ThreadedSshShell'
def test_connection_factory_has_sshshell_constructor_active_by_default():
from moler.connection_factory import get_connection
conn = get_connection(io_type='sshshell', variant='threaded',
host='localhost', port=2222, username="moler", password="moler_passwd")
assert conn.__module__ == 'moler.io.raw.sshshell'
assert conn.__class__.__name__ == 'ThreadedSshShell'
assert hasattr(conn, 'moler_connection')
assert conn.sshshell.host == "localhost"
assert conn.sshshell.port == 2222
assert conn.sshshell.username == "moler"
assert conn.sshshell.password == "moler_passwd"
def test_can_select_connection_by_name(connections_config):
from moler.connection_factory import get_connection
connections_config.define_connection(name="www_server_1",
io_type='tcp',
host='localhost', port=2345)
connections_config.set_default_variant(io_type='tcp', variant='threaded')
conn = get_connection(name='www_server_1')
assert conn.__module__ == 'moler.io.raw.tcp'
assert conn.__class__.__name__ == 'ThreadedTcp'
assert conn.host == 'localhost'
assert conn.port == 2345
def test_get_connection_may_not_use_both__name_and_io_type():
from moler.connection_factory import get_connection
with pytest.raises(AssertionError) as err:
get_connection(name='www_server_1',
io_type='tcp', host='localhost', port=2345)
assert "Use either 'name' or 'io_type' parameter (not both)" in str(err.value)
def main(connections2observe4ip):
# Starting the clients
connections = []
for _, connection_name, ping_ip in connections2observe4ip:
# ------------------------------------------------------------------
# This front-end code hides all details of connection.
# We just use its name - such name should be meaningful for user.
# like: "main_dns_server", "backup_ntp_server", ...
# Another words, all we want here is stg like:
# "give me connection to main_dns_server"
# ------------------------------------------------------------------
tcp_connection = get_connection(name=connection_name)
tcp_connection.moler_connection.name = connection_name
client_thread = threading.Thread(target=ping_observing_task,
args=(tcp_connection, ping_ip))
client_thread.start()
connections.append(client_thread)
# await observers job to be done
for client_thread in connections:
client_thread.join()
def main(connections2observe4ip):
# Starting the clients
connections = []
for address, ping_ip in connections2observe4ip:
host, port = address
# ------------------------------------------------------------------
# This front-end code hides parallelism variant
# used to read data from connection.
# We don't care if it is TCP connection based on threads or asyncio.
# All we want here is "any TCP connection towards given host/port".
# "any" means here: TCP variant as configured on backend.
# ------------------------------------------------------------------
tcp_connection = get_connection(io_type='tcp', host=host, port=port)
client_thread = threading.Thread(target=ping_observing_task,
args=(tcp_connection, ping_ip))
client_thread.start()
connections.append(client_thread)
# await observers job to be done
for client_thread in connections:
client_thread.join()
def main(connections2observe4ip):
# Starting the clients
connections = []
for address, ping_ip in connections2observe4ip:
host, port = address
# ------------------------------------------------------------------
# This front-end code hides parallelism variant
# used to read data from connection.
# We don't care if it is TCP connection based on threads or asyncio.
# All we want here is "any TCP connection towards given host/port".
# "any" means here: TCP variant as configured on backend.
# ------------------------------------------------------------------
tcp_connection = get_connection(io_type='tcp', host=host, port=port)
tcp_connection.moler_connection.name = "{}:{}".format(host, port)
client_thread = threading.Thread(target=ping_observing_task,
args=(tcp_connection, ping_ip))
client_thread.start()
connections.append(client_thread)
# await observers job to be done
for client_thread in connections:
client_thread.join()
def main(connections2observe4ip):
# Starting the clients
connections = []
for _, connection_name, ping_ip in connections2observe4ip:
# ------------------------------------------------------------------
# This front-end code hides all details of connection.
# We just use its name - such name should be meaningful for user.
# like: "main_dns_server", "backup_ntp_server", ...
# Another words, all we want here is stg like:
# "give me connection to main_dns_server"
# ------------------------------------------------------------------
tcp_connection = get_connection(name=connection_name)
tcp_connection.moler_connection.name = connection_name
client_thread = threading.Thread(target=ping_observing_task,
args=(tcp_connection, ping_ip))
client_thread.start()
connections.append(client_thread)
# await observers job to be done
for client_thread in connections:
client_thread.join()
def main(connections2observe4ip):
# Starting the clients
connections = []
for address, ping_ip in connections2observe4ip:
host, port = address
# ------------------------------------------------------------------
# This front-end code hides parallelism variant
# used to read data from connection.
# We don't care if it is TCP connection based on threads or asyncio.
# All we want here is "any TCP connection towards given host/port".
# "any" means here: TCP variant as configured on backend.
# ------------------------------------------------------------------
tcp_connection = get_connection(io_type='tcp', host=host, port=port)
client_thread = threading.Thread(target=ping_observing_task,
args=(tcp_connection, ping_ip))
client_thread.start()
connections.append(client_thread)
# await observers job to be done
for client_thread in connections:
client_thread.join()