Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_repr_multi_warning(self):
rsp = etree.XML(multi_warning_xml)
from ncclient.operations import RPCError
warn_msg = '''
warning
statement not found
'''
errs = RPCError(etree.XML(warn_msg))
errs.errors = [errs, errs]
obj = RpcError(rsp=rsp, errs=errs)
self.assertEqual(obj.rpc_error['severity'], 'warning')
def _read_file(self, fname):
from ncclient.xml_ import NCElement
fpath = os.path.join(os.path.dirname(__file__),
'rpc-reply', fname)
with open(fpath) as fp:
foo = fp.read()
if fname == 'get-rpc-error.xml':
# Raise ncclient exception for error
raise RPCError(etree.XML(foo))
elif fname == 'get-permission-denied.xml':
# Raise ncclient exception for error
raise RPCError(etree.XML(foo))
elif (fname == 'get-index-error.xml' or
fname == 'get-system-core-dumps.xml' or
fname == 'load-configuration-error.xml' or
fname == 'show-configuration-interfaces.xml' or
fname == 'show-interfaces-terse-asdf.xml'):
rpc_reply = NCElement(foo, self.dev._conn._device_handler
.transform_reply())
elif (fname == 'show-configuration.xml' or
fname == 'show-system-alarms.xml'):
rpc_reply = NCElement(foo, self.dev._conn._device_handler
.transform_reply())._NCElement__doc
elif fname == 'show-interface-terse.json':
rpc_reply = json.loads(foo)
elif fname == 'get-route-information.json':
rpc_reply = NCElement(foo, self.dev._conn._device_handler
.transform_reply())
def test_tty_netconf_single_rpc_error(self, mock_rcv):
mock_rcv.return_value = """
protocol
operation-failed
error
interface-ranges expansion failed
"""
self.assertRaises(RPCError, self.tty_net.rpc, 'commit-configuration')
def test_tty_netconf_multi_rpc_error(self, mock_rcv):
mock_rcv.return_value = self._read_file('commit-configuration.xml')
self.assertRaises(RPCError, self.tty_net.rpc, 'commit-configuration')
def load_configuration(self, *args, **kwargs):
"""Loads given configuration on device
:format: Format of configuration (xml, text, set)
:action: Action to be performed (merge, replace, override, update)
:target: is the name of the configuration datastore being edited
:config: is the configuration in string format."""
if kwargs.get('config'):
if kwargs.get('format', 'xml') == 'xml':
kwargs['config'] = to_ele(kwargs['config'])
try:
return self.m.load_configuration(*args, **kwargs).data_xml
except RPCError as exc:
raise Exception(to_xml(exc.xml))
rpc_cmd.__class__.__name__)
# invoking a bad RPC will cause a connection object exception
# will will be raised directly to the caller ... for now ...
# @@@ need to trap this and re-raise accordingly.
try:
rpc_rsp_e = self._rpc_reply(rpc_cmd_e,
ignore_warning=ignore_warning)
except NcOpErrors.TimeoutExpiredError:
# err is a TimeoutExpiredError from ncclient,
# which has no such attribute as xml.
raise EzErrors.RpcTimeoutError(self, rpc_cmd_e.tag, self.timeout)
except NcErrors.TransportError:
raise EzErrors.ConnectClosedError(self)
except RPCError as ex:
if hasattr(ex, 'xml'):
rsp = JXML.remove_namespaces(ex.xml)
message = rsp.findtext('error-message')
# see if this is a permission error
if message and message == 'permission denied':
raise EzErrors.PermissionError(cmd=rpc_cmd_e,
rsp=rsp,
errs=ex)
else:
rsp = None
raise EzErrors.RpcError(cmd=rpc_cmd_e,
rsp=rsp,
errs=ex)
# Something unexpected happened - raise it up
except Exception as err:
warnings.warn("An unknown exception occured - please report.",
def edit_config(self, config=None, format='xml', target='candidate', default_operation=None, test_option=None, error_option=None, remove_ns=False):
if config is None:
raise ValueError('config value must be provided')
try:
resp = self.m.edit_config(config, format=format, target=target, default_operation=default_operation, test_option=test_option,
error_option=error_option)
if remove_ns:
response = remove_namespaces(resp)
else:
response = resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
return response
except RPCError as exc:
raise Exception(to_xml(exc.xml))
config=data).xml
else:
response = self.handle.dispatch(ET.fromstring(data)).xml
"""
reply = 'NETCONF REQUEST:\n'
reply += '================\n\n'
if op == 'edit-config':
reply += 'EDIT-CONFIG LOCK: %s\n\n' % str(lock)
reply += str(parser) + '\n\n'
reply += 'NETCONF RESPONSE:\n'
reply += '================\n\n'
reply += ET.tostring(ET.fromstring(response), pretty_print=True)
"""
reply.append(ET.fromstring(response))
logging.debug("RECEIVE: \n=====\n%s\n=====\n" % response)
except RPCError as e:
reply.append(e._raw)
self.disconnect()
return reply
def load_configuration(self, format='xml', action='merge', target='candidate', config=None):
"""
Load given configuration on device
:param format: Format of configuration (xml, text, set)
:param action: Action to be performed (merge, replace, override, update)
:param target: The name of the configuration datastore being edited
:param config: The configuration to be loaded on remote host in string format
:return: Received rpc response from remote host in string format
"""
if config:
if format == 'xml':
config = to_ele(config)
try:
return self.m.load_configuration(format=format, action=action, target=target, config=config).data_xml
except RPCError as exc:
raise Exception(to_xml(exc.xml))
# invoking a bad RPC will cause a connection object exception
# will will be raised directly to the caller ... for now ...
# @@@ need to trap this and re-raise accordingly.
try:
rpc_rsp_e = self._rpc_reply(rpc_cmd_e,
ignore_warning=ignore_warning,
filter_xml=kvargs.get(
'filter_xml'))
except NcOpErrors.TimeoutExpiredError:
# err is a TimeoutExpiredError from ncclient,
# which has no such attribute as xml.
raise EzErrors.RpcTimeoutError(self, rpc_cmd_e.tag, self.timeout)
except NcErrors.TransportError:
raise EzErrors.ConnectClosedError(self)
except RPCError as ex:
if hasattr(ex, 'xml'):
rsp = JXML.remove_namespaces(ex.xml)
message = rsp.findtext('error-message')
# see if this is a permission error
if message and message == 'permission denied':
raise EzErrors.PermissionError(cmd=rpc_cmd_e,
rsp=rsp,
errs=ex)
else:
rsp = None
raise EzErrors.RpcError(cmd=rpc_cmd_e,
rsp=rsp,
errs=ex)
# Something unexpected happened - raise it up
except Exception as err:
warnings.warn("An unknown exception occured - please report.",