Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_filter_xml_sax_on_junos_rfc_compliant(self, mock_uuid4, mock_select, mock_session, mock_recv,
mock_close, mock_send, mock_send_ready, mock_connected):
mock_send.return_value = True
mock_send_ready.return_value = -1
mock_uuid4.return_value = type('dummy', (), {'urn': "urn:uuid:e0a7abe3-fffa-11e5-b78e-b8e85604f858"})
device_handler = manager.make_device_handler({'name': 'junos', 'use_filter': True})
rpc = ''
mock_recv.side_effect = self._read_file('get-software-information.xml')
session = SSHSession(device_handler)
session._connected = True
session._channel = paramiko.Channel("c100")
session.parser = session._device_handler.get_xml_parser(session)
obj = ExecuteRpc(session, device_handler, raise_mode=RaiseMode.ALL)
obj._filter_xml = ''
session.run()
resp = obj.request(rpc)._NCElement__doc[0]
from lxml import etree
print(resp)
self.assertEqual(len(resp.xpath('multi-routing-engine-item/re-name')), 2)
# as filter_xml is not having software-information, response wont contain it
self.assertEqual(len(resp.xpath('multi-routing-engine-item/software-information')), 0)
def _mock_manager(self, *args, **kwargs):
if kwargs:
if args and ('normalize' in kwargs or 'filter_xml' in kwargs):
return self._read_file(args[0].tag + '.xml')
device_params = kwargs['device_params']
device_handler = make_device_handler(device_params)
session = SSHSession(device_handler)
return Manager(session, device_handler)
if args:
return self._read_file(args[0].tag + '.xml')
def test_getconf(self, mock_request, mock_session):
device_handler = manager.make_device_handler({'name': 'junos'})
session = ncclient.transport.SSHSession(device_handler)
obj = GetConfiguration(
session,
device_handler,
raise_mode=RaiseMode.ALL)
root_filter = new_ele('filter')
config_filter = sub_ele(root_filter, 'configuration')
system_filter = sub_ele(config_filter, 'system')
obj.request(format='xml', filter=system_filter)
node = new_ele('get-configuration', {'format': 'xml'})
node.append(system_filter)
call = mock_request.call_args_list[0][0][0]
self.assertEqual(call.tag, node.tag)
self.assertEqual(call.attrib, node.attrib)
def _mock_manager_setup(self, *args, **kwargs):
if kwargs:
device_params = kwargs['device_params']
device_handler = make_device_handler(device_params)
session = SSHSession(device_handler)
return Manager(session, device_handler)
def test_loadconf_text(self, mock_request, mock_session):
device_handler = manager.make_device_handler({'name': 'junos'})
session = ncclient.transport.SSHSession(device_handler)
obj = LoadConfiguration(
session,
device_handler,
raise_mode=RaiseMode.ALL)
config = 'system { location floor 7; }'
obj.request(format='text', action='merge', config=config)
node = new_ele('load-configuration', {'format': 'text', 'action': 'merge'})
sub_ele(node, 'configuration-text').text = config
call = mock_request.call_args_list[0][0][0]
self.assertEqual(call.tag, node.tag)
self.assertEqual(call.attrib, node.attrib)
def _mock_manager(self, *args, **kwargs):
if kwargs:
device_params = kwargs['device_params']
device_handler = make_device_handler(device_params)
session = SSHSession(device_handler)
return Manager(session, device_handler)
if args:
return self._read_file('chassis_' + args[0].tag + '.xml')
def test_ncelement_reply_002(self):
"""test parse rpc_reply and xpath"""
#read in reply1 contents
with open('test/reply1', 'r') as f:
reply = f.read()
device_params = {'name':'junos'}
device_handler = manager.make_device_handler(device_params)
transform_reply = device_handler.transform_reply()
result = NCElement(reply, transform_reply)
#XPATH checks work
assert_equal(result.xpath("//host-name")[0].text,"R1")
assert_equal(result.xpath("/rpc-reply/software-information/host-name")[0].text,"R1")
assert_equal(result.xpath("software-information/host-name")[0].text,"R1")
def test_rpc_rpcerror(self, mock_thread, mock_send):
device_handler = manager.make_device_handler({'name': 'junos'})
capabilities = Capabilities(device_handler.get_capabilities())
session = ncclient.transport.Session(capabilities)
obj = RPC(session, device_handler, raise_mode=RaiseMode.ALL, timeout=0)
reply = RPCReply(xml1)
obj._reply = reply
node = new_ele("commit")
sub_ele(node, "confirmed")
err = RPCError(to_ele(xml2))
obj.deliver_error(err)
self.assertRaises(RPCError, obj._request, node)
def _mock_manager_setup(self, *args, **kwargs):
if kwargs:
device_params = kwargs['device_params']
device_handler = make_device_handler(device_params)
session = SSHSession(device_handler)
return Manager(session, device_handler)
def test_validated_element_fail_2(self):
device_params = {'name': 'junos'}
device_handler = manager.make_device_handler(device_params)
transform_reply = device_handler.transform_reply()
result = NCElement(self.reply, transform_reply)
XMLError.message = "Element does not meet requirement"
result_xml = result.data_xml
self.assertRaises(XMLError,
validated_element,
result_xml,
tags=[
"rpc-reply",
"rpc"],
attrs=[
["attrib1"],
["attrib2"]])