Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
warning
statement not found
"""
rsp = XML(rsp_string)
errors = []
for err in rsp.findall('.//'+qualify('rpc-error')):
errors.append(RPCError(err))
raise RPCError(rsp, errs=errors)
warning
statement not found
"""
rsp = XML(rsp_string)
errors = []
for err in rsp.findall('.//'+qualify('rpc-error')):
errors.append(RPCError(err))
raise RPCError(rsp, errs=errors)
def method(self, x):
rpc_error = RPCError(XML(''), errs=None)
raise rpc_error
decorator = ignoreWarnDecorator(method)
warning
foo bar
"""
rsp = XML(rsp_string)
errors = []
for err in rsp.findall('.//'+qualify('rpc-error')):
errors.append(RPCError(err))
raise RPCError(rsp, errs=errors)
protocol
operation-failed
error
syntax error
protcols
"""
rsp = XML(rsp_string)
errors = []
for err in rsp.findall('.//'+qualify('rpc-error')):
errors.append(RPCError(err))
raise RPCError(rsp, errs=errors)
#we need to remove the confusing netconf namespaces
#with our own function
rpc = self.__remove_namespace(xsd_fetch, self.__NETCONF_NAMESPACE)
#show how the created rpc message looks like
print "GENERATED RPC MESSAGE:\n"
print(etree.tostring(rpc,pretty_print=True))
#SENDING THE CREATED RPC XML to the server
#rpc_reply = without .xml the reply has GetReply type
#rpc_reply = with .xml we convert it to xml-string
try:
rpc_reply = self.__connection.dispatch(rpc).xml
except (ncclient.operations.rpc.RPCError) as e:
self.__logger.info("ncclient: RPCError: %s" % e)
raise RPCError(e)
return None
except (ncclient.transport.TransportError) as e:
self.__logger.info("ncclient: TransportError % s" % e)
#self.connect()
raise RPCError(e)
return None
except (ncclient.operations.rpc.OperationError) as e:
self.__logger.info("ncclient: OperationError: %s" % e)
# self.__logger.info("function (connect)Connected: %s"
# % self.__connection.connected)
# self.connect()
raise RPCError(e)
return None
:cmd: is the rpc command
:rsp: is the rpc response (after )
:errs: is a list of dictionaries of extracted elements.
:dev: is the device rpc was executed on
:timeout: is the timeout value of the device
:re: is the RE or member exception occured on
"""
self.cmd = cmd
self.rsp = rsp
self.dev = dev
self.timeout = timeout
self.re = re
self.rpc_error = None
self.xml = rsp
# To handle errors coming from ncclient, Here errs is list of RPCError
if isinstance(errs, RPCError) and hasattr(errs, 'errors'):
self.errs = [JXML.rpc_error(error.xml) for error in errs.errors]
for error in errs.errors:
if error.severity == 'error':
self.rsp = JXML.remove_namespaces(error.xml)
break
else:
if errs.severity == 'warning':
for error in errs.errors:
if error.severity == 'warning':
self.rsp = JXML.remove_namespaces(error.xml)
break
self.message = errs.message
else:
self.errs = errs
self.message = "\n".join(["%s: %s" % (err['severity'].strip(),
err['message'].strip())
def to_dict(self):
return dict([ (attr[1:], getattr(self, attr)) for attr in RPCError.tag_to_attr.values() ])
def removeNF (self, vnf_id):
"""
Stop and remove the given NF using the general RPC calls.
"""
with self as adapter:
try:
# return adapter.stopVNF(vnf_id=vnf_id)
reply = adapter.stopVNF(vnf_id=vnf_id)
from pprint import pprint
pprint(adapter.getVNFInfo())
return reply
except RPCError as e:
log.error("Got Error during removeVNF through NETCONF:")
raise
except KeyError as e:
log.warning(
"Missing required attribute from NETCONF-based RPC reply: %s! Skip "
"VNF initiation." % e.args[0])
except (TransportError, OperationError) as e:
log.error(
"Failed to remove NF due to a connection error! Cause: %s" % e)