Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def rpc(self, name):
"""RPC to be execute on remote device
:name: Name of rpc in string format"""
try:
obj = to_ele(name)
resp = self.m.rpc(obj)
return resp.data_xml if hasattr(resp, 'data_xml') else resp.xml
except RPCError as exc:
msg = exc.xml
raise Exception(to_xml(msg))
def exec_command(self, cmd, in_data=None, sudoable=True):
"""Sends the request to the node and returns the reply
The method accepts two forms of request. The first form is as a byte
string that represents xml string be send over netconf session.
The second form is a json-rpc (2.0) byte string.
"""
if self._manager:
# to_ele operates on native strings
request = to_ele(to_native(cmd, errors='surrogate_or_strict'))
if request is None:
return 'unable to parse request'
try:
reply = self._manager.rpc(request)
except RPCError as exc:
error = self.internal_error(data=to_text(to_xml(exc.xml), errors='surrogate_or_strict'))
return json.dumps(error)
return reply.data_xml
else:
return super(Connection, self).exec_command(cmd, in_data, sudoable)
def vlan_update(self, number, description):
content = to_ele("""
VLAN{0}
{0}
""".format(number))
if description is not None:
content.append(to_ele("{}".format(description)))
return content
def unset_interface_auto_negotiation_state(self, interface_id):
config = self.query(one_interface(interface_id))
interface_node = self.get_interface_config(interface_id, config)
if interface_node is None:
self._get_physical_interface(interface_id)
return
auto_negotiation_present = first(interface_node.xpath('ether-options/auto-negotiation')) is not None
no_auto_negotiation_present = first(interface_node.xpath('ether-options/no-auto-negotiation')) is not None
if auto_negotiation_present or no_auto_negotiation_present:
content = to_ele("""
{0}
""".format(interface_id))
ether_options = to_ele("")
if auto_negotiation_present:
ether_options.append(to_ele(""))
elif no_auto_negotiation_present:
ether_options.append(to_ele(""))
update = Update()
content.append(ether_options)
update.add_interface(content)
self._push_interface_update(interface_id, update)
def rstp_interface_removal(interface_id):
return to_ele("""
{}
""".format(interface_id))
def m():
return to_ele("""
{}
""".format(vlan_id))
def set_interface_auto_negotiation_state(self, interface_id, negotiation_state):
content = to_ele("""
{0}
""".format(interface_id))
if negotiation_state == ON:
content.append(to_ele(""))
else:
content.append(to_ele(""))
update = Update()
update.add_interface(content)
self._push_interface_update(interface_id, update)
def bond_update(number, *aggregated_ether_options):
content = to_ele("""
{0}
""".format(bond_name(number)))
aggregated_ether_options_node = first(content.xpath("//aggregated-ether-options"))
map(aggregated_ether_options_node.append, aggregated_ether_options)
return content
def get_interface_port_mode_update_element(self, mode):
return to_ele("{}".format(mode))
def unset_interface_description(self, interface_id):
update = Update()
update.add_interface(interface_main_update(interface_id, [
to_ele("")
]))
try:
self._push(update)
except RPCError as e:
if e.severity != "warning":
raise UnknownInterface(interface_id)