Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_discard_changes(self, mock_request):
session = ncclient.transport.SSHSession(self.device_handler)
session._server_capabilities = [':candidate', ":confirmed-commit"]
obj = DiscardChanges(
session,
self.device_handler,
raise_mode=RaiseMode.ALL)
obj.request()
node = new_ele("discard-changes")
xml = ElementTree.tostring(node)
call = mock_request.call_args_list[0][0][0]
call = ElementTree.tostring(call)
self.assertEqual(call, xml)
def test_get(self, mock_request):
session = ncclient.transport.SSHSession(self.device_handler)
obj = Get(session, self.device_handler, raise_mode=RaiseMode.ALL)
root_filter = new_ele('filter')
config_filter = sub_ele(root_filter, 'configuration')
system_filter = sub_ele(config_filter, 'system')
sub_ele(system_filter, 'services')
obj.request(copy.deepcopy(root_filter))
node = new_ele("get")
node.append(util.build_filter(root_filter))
xml = ElementTree.tostring(node)
call = mock_request.call_args_list[0][0][0]
call = ElementTree.tostring(call)
self.assertEqual(call, xml)
def test_subscribe_all(self, mock_request):
session = ncclient.transport.SSHSession(self.device_handler)
session._server_capabilities = [":notification"]
obj = CreateSubscription(
session,
self.device_handler,
raise_mode=RaiseMode.ALL)
obj.request()
node = new_ele_ns("create-subscription", NETCONF_NOTIFICATION_NS)
xml = ElementTree.tostring(node)
call = mock_request.call_args_list[0][0][0]
call = ElementTree.tostring(call)
self.assertEqual(call, xml)
def test_rpc_timeout_error(self, mock_thread, mock_send):
device_handler = manager.make_device_handler({'name': 'junos'})
capabilities = Capabilities(device_handler.get_capabilities())
session = ncclient.transport.Session(capabilities)
obj = RPC(session, device_handler, raise_mode=RaiseMode.ALL, timeout=0)
reply = RPCReply(xml1)
obj.deliver_reply(reply)
node = new_ele("commit")
sub_ele(node, "confirmed")
mock_thread.return_value = False
self.assertRaises(TimeoutExpiredError, obj._request, node)
def test_command(self, mock_request, mock_session):
device_handler = manager.make_device_handler({'name': 'junos'})
session = ncclient.transport.SSHSession(device_handler)
obj = Command(session, device_handler, raise_mode=RaiseMode.ALL)
command = 'show system users'
format = 'text'
obj.request(command=command, format=format)
node = new_ele('command', {'format': format})
node.text = command
call = mock_request.call_args_list[0][0][0]
self.assertEqual(call.tag, node.tag)
self.assertEqual(call.text, node.text)
def test_subscribe_times(self, mock_request):
session = ncclient.transport.SSHSession(self.device_handler)
session._server_capabilities = [":notification"]
obj = CreateSubscription(
session,
self.device_handler,
raise_mode=RaiseMode.ALL)
obj.request(filter=None, start_time=start_time, stop_time=stop_time)
node = new_ele_ns("create-subscription", NETCONF_NOTIFICATION_NS)
sub_ele_ns(node, "startTime", NETCONF_NOTIFICATION_NS).text = start_time
sub_ele_ns(node, "stopTime", NETCONF_NOTIFICATION_NS).text = stop_time
xml = ElementTree.tostring(node)
call = mock_request.call_args_list[0][0][0]
call = ElementTree.tostring(call)
self.assertEqual(call, xml)
str(e)
)
return
nckwargs = dict(
host=module.params['host'],
port=module.params['port'],
hostkey_verify=module.params['hostkey_verify'],
username=module.params['username'],
password=module.params['password'],
)
retkwargs = dict()
try:
m = ncclient.manager.connect(**nckwargs)
except ncclient.transport.errors.AuthenticationError:
module.fail_json(
msg='authentication failed while connecting to device'
)
except:
e = get_exception()
module.fail_json(
msg='error connecting to the device: ' +
str(e)
)
return
retkwargs['server_capabilities'] = list(m.server_capabilities)
try:
changed = netconf_edit_config(
m=m,
xml=module.params['xml'],
commit=True,
`device_params={'name': ''}` in connection parameters. For the time,
'junos' and 'nexus' are supported for Juniper and Cisco Nexus respectively.
A custom device handler can be provided with
`device_params={'handler':}` in connection parameters.
"""
# Extract device parameter and manager parameter dictionaries, if they were passed into this function.
# Remove them from kwds (which should keep only session.connect() parameters).
device_params = _extract_device_params(kwds)
manager_params = _extract_manager_params(kwds)
device_handler = make_device_handler(device_params)
device_handler.add_additional_ssh_connect_params(kwds)
global VENDOR_OPERATIONS
VENDOR_OPERATIONS.update(device_handler.add_additional_operations())
session = transport.SSHSession(device_handler)
if "hostkey_verify" not in kwds or kwds["hostkey_verify"]:
session.load_known_hosts()
try:
session.connect(*args, **kwds)
except Exception as ex:
if session.transport:
session.close()
raise
return Manager(session, device_handler, **manager_params)
try:
if handler == 'get_config':
call = ET.tostring(call.getchildren()[0])
return self._mgr.get(filter=('subtree', call))
call = ET.tostring(call)
if handler == 'get':
call_element = xml_.to_ele(call)
return ET.fromstring(str(self._mgr.dispatch(call_element)))
if handler == 'edit_config':
self._mgr.edit_config(target=target, config=call)
if handler == 'delete_config':
self._mgr.delete_config(target=target)
if handler == 'copy_config':
self._mgr.copy_config(target=target, source=source)
except (ncclient.transport.TransportError,
ncclient.transport.SessionCloseError,
ncclient.transport.SSHError,
ncclient.transport.AuthenticationError,
ncclient.transport.SSHUnknownHostError) as error:
logging.error(error)
raise DeviceCommError
rpc = self.__remove_namespace(xsd_fetch, self.__NETCONF_NAMESPACE)
#show how the created rpc message looks like
print "GENERATED RPC MESSAGE:\n"
print(etree.tostring(rpc,pretty_print=True))
#SENDING THE CREATED RPC XML to the server
#rpc_reply = without .xml the reply has GetReply type
#rpc_reply = with .xml we convert it to xml-string
try:
rpc_reply = self.__connection.dispatch(rpc).xml
except (ncclient.operations.rpc.RPCError) as e:
self.__logger.info("ncclient: RPCError: %s" % e)
raise RPCError(e)
return None
except (ncclient.transport.TransportError) as e:
self.__logger.info("ncclient: TransportError % s" % e)
#self.connect()
raise RPCError(e)
return None
except (ncclient.operations.rpc.OperationError) as e:
self.__logger.info("ncclient: OperationError: %s" % e)
# self.__logger.info("function (connect)Connected: %s"
# % self.__connection.connected)
# self.connect()
raise RPCError(e)
return None
#we set our global variable's value to this xml-string
#therefore, last RPC will always be accessible
self.__rpc_reply_as_xml = rpc_reply