How to use the ncclient.manager.make_device_handler function in ncclient

To help you get started, we’ve selected a few ncclient examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ncclient / ncclient / test / unit / transport / test_parser.py View on Github external
def test_filter_xml_sax_on_junos_rfc_compliant(self, mock_uuid4, mock_select, mock_session, mock_recv,
                              mock_close, mock_send, mock_send_ready, mock_connected):
        mock_send.return_value = True
        mock_send_ready.return_value = -1
        mock_uuid4.return_value = type('dummy', (), {'urn': "urn:uuid:e0a7abe3-fffa-11e5-b78e-b8e85604f858"})
        device_handler = manager.make_device_handler({'name': 'junos', 'use_filter': True})
        rpc = ''
        mock_recv.side_effect = self._read_file('get-software-information.xml')
        session = SSHSession(device_handler)
        session._connected = True
        session._channel = paramiko.Channel("c100")
        session.parser = session._device_handler.get_xml_parser(session)
        obj = ExecuteRpc(session, device_handler, raise_mode=RaiseMode.ALL)
        obj._filter_xml = ''
        session.run()
        resp = obj.request(rpc)._NCElement__doc[0]
        from lxml import etree
        print(resp)
        self.assertEqual(len(resp.xpath('multi-routing-engine-item/re-name')), 2)
        # as filter_xml is not having software-information, response wont contain it
        self.assertEqual(len(resp.xpath('multi-routing-engine-item/software-information')), 0)
github Juniper / py-junos-eznc / tests / unit / factory / test_optable.py View on Github external
def _mock_manager(self, *args, **kwargs):
        if kwargs:
            if args and ('normalize' in kwargs or 'filter_xml' in kwargs):
                return self._read_file(args[0].tag + '.xml')
            device_params = kwargs['device_params']
            device_handler = make_device_handler(device_params)
            session = SSHSession(device_handler)
            return Manager(session, device_handler)

        if args:
            return self._read_file(args[0].tag + '.xml')
github ncclient / ncclient / test / unit / operations / third_party / juniper / test_rpc.py View on Github external
def test_getconf(self, mock_request, mock_session):
        device_handler = manager.make_device_handler({'name': 'junos'})
        session = ncclient.transport.SSHSession(device_handler)
        obj = GetConfiguration(
            session,
            device_handler,
            raise_mode=RaiseMode.ALL)
        root_filter = new_ele('filter')
        config_filter = sub_ele(root_filter, 'configuration')
        system_filter = sub_ele(config_filter, 'system')
        obj.request(format='xml', filter=system_filter)
        node = new_ele('get-configuration', {'format': 'xml'})
        node.append(system_filter)
        call = mock_request.call_args_list[0][0][0]
        self.assertEqual(call.tag, node.tag)
        self.assertEqual(call.attrib, node.attrib)
github Juniper / py-junos-eznc / tests / unit / facts / test_ifd_style.py View on Github external
def _mock_manager_setup(self, *args, **kwargs):
        if kwargs:
            device_params = kwargs['device_params']
            device_handler = make_device_handler(device_params)
            session = SSHSession(device_handler)
            return Manager(session, device_handler)
github ncclient / ncclient / test / unit / operations / third_party / juniper / test_rpc.py View on Github external
def test_loadconf_text(self, mock_request, mock_session):
        device_handler = manager.make_device_handler({'name': 'junos'})
        session = ncclient.transport.SSHSession(device_handler)
        obj = LoadConfiguration(
            session,
            device_handler,
            raise_mode=RaiseMode.ALL)
        config = 'system { location floor 7; }'
        obj.request(format='text', action='merge', config=config)
        node = new_ele('load-configuration', {'format': 'text', 'action': 'merge'})
        sub_ele(node, 'configuration-text').text = config
        call = mock_request.call_args_list[0][0][0]
        self.assertEqual(call.tag, node.tag)
        self.assertEqual(call.attrib, node.attrib)
github Juniper / py-junos-eznc / tests / unit / facts / test_chassis.py View on Github external
def _mock_manager(self, *args, **kwargs):
        if kwargs:
            device_params = kwargs['device_params']
            device_handler = make_device_handler(device_params)
            session = SSHSession(device_handler)
            return Manager(session, device_handler)

        if args:
            return self._read_file('chassis_' + args[0].tag + '.xml')
github ncclient / ncclient / test / test_xml_.py View on Github external
def test_ncelement_reply_002(self):
        """test parse rpc_reply and xpath"""
        #read in reply1 contents
        with open('test/reply1', 'r') as f:
            reply = f.read()
        device_params = {'name':'junos'}
        device_handler = manager.make_device_handler(device_params)
        transform_reply = device_handler.transform_reply()
        result = NCElement(reply, transform_reply)
        #XPATH checks work
        assert_equal(result.xpath("//host-name")[0].text,"R1")
        assert_equal(result.xpath("/rpc-reply/software-information/host-name")[0].text,"R1")
        assert_equal(result.xpath("software-information/host-name")[0].text,"R1")
github ncclient / ncclient / test / unit / operations / test_rpc.py View on Github external
def test_rpc_rpcerror(self, mock_thread, mock_send):
        device_handler = manager.make_device_handler({'name': 'junos'})
        capabilities = Capabilities(device_handler.get_capabilities())
        session = ncclient.transport.Session(capabilities)
        obj = RPC(session, device_handler, raise_mode=RaiseMode.ALL, timeout=0)
        reply = RPCReply(xml1)
        obj._reply = reply
        node = new_ele("commit")
        sub_ele(node, "confirmed")

        err = RPCError(to_ele(xml2))
        obj.deliver_error(err)
        self.assertRaises(RPCError, obj._request, node)
github Juniper / py-junos-eznc / tests / unit / facts / test_iri_mapping.py View on Github external
def _mock_manager_setup(self, *args, **kwargs):
        if kwargs:
            device_params = kwargs['device_params']
            device_handler = make_device_handler(device_params)
            session = SSHSession(device_handler)
            return Manager(session, device_handler)
github ncclient / ncclient / test / unit / test_xml_.py View on Github external
def test_validated_element_fail_2(self):
        device_params = {'name': 'junos'}
        device_handler = manager.make_device_handler(device_params)
        transform_reply = device_handler.transform_reply()
        result = NCElement(self.reply, transform_reply)
        XMLError.message = "Element does not meet requirement"
        result_xml = result.data_xml
        self.assertRaises(XMLError,
            validated_element,
                result_xml,
                tags=[
                    "rpc-reply",
                    "rpc"],
                attrs=[
                    ["attrib1"],
                    ["attrib2"]])