Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
reqs=reqs.args, ret_num={}, source=found, all_keys=True)
local_address = keys[0].get('local')
try:
out = device.parse('show ted database extensive')
except SchemaEmptyParserError:
log.info('Parser is empty')
timeout.sleep()
continue
reqs = R([
'node', '(.*)', 'protocol', '(.*)', 'to', '(.*)', 'local',
local_address.split('/')[0], 'remote', '(.*)', 'metric',
'(?P.*)'
])
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(
reqs=reqs.args,
ret_num={},
source=found,
all_keys=True)
if 'metric' in keys[0] and int(expected_cost) == int(
keys[0]['metric']):
return True
timeout.sleep()
return False
log.info('This api does not support cost type {}'.format(cost_type))
cmd = 'show ip route'
if protocol_codes is None:
protocol_codes = '(.*)'
try:
out = device.parse(cmd)
except Exception as e:
log.error("Failed to parse '{}':\n{}".format(cmd, e))
return routes
reqs = R(['vrf', '(.*)',
'address_family', '(.*)',
'routes', '(?P.*)',
'source_protocol_codes', protocol_codes])
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(reqs=reqs.args, ret_num={},
source=found, all_keys=True)
for route in keys:
routes.append(route['route'])
else:
log.error("Could not find any route with protocol_codes '{}'".\
format(protocol_codes))
return routes
# len(requirements[0])
# when the requirements is like below:
# verify_ops={ \
# 'conf.vxlan.Vxlan': {
# 'requirements': [ \
# [NotExists('device_attr')]]}},
# verify_ops={ \
# 'ops.vxlan.vxlan.Vxlan': {
# 'requirements': [ \
# [NotExists('nve')]]}},
if len(requirements[0]) == 1 and isinstance(requirements[0][0], NotExists) and \
not getattr(ops, requirements[0][0].value, {}):
return
rs = [R(requirement) for requirement in requirements]
ret = find([ops], *rs, filter_=False, all_keys=all_keys)
# If missing is True, then we expect it to be missing, aka ret empty
if not ret and not missing:
raise Exception("'{req}' does not exists in "
"'{o}'".format(req=requirements, o=ops))
if ret and missing:
# It should be missing
raise Exception("'{req}' exists in "
"'{o}' and it should not "
"exists".format(req=requirements, o=ops))
"outgoing_interface",
interface,
"outgoing_interface",
interface,
]
)
while timeout.iterate():
try:
out = device.parse("show ip route connected")
except Exception as e:
log.info(e)
timeout.sleep()
continue
found = find([out], reqs, filter_=False, all_keys=True)
if flag == bool(found):
return True
timeout.sleep()
return False
def verify_ntp_synchronized_server_alias(self, server, device, alias=None):
'''Verify that a specific server is the synchronized ntp server
verify "1.1.1.1" is synchronized ntp server on device ""
'''
ops = self.genie_ops_on_device_alias('ntp', device, alias)
rs = [R(['info', 'clock_state', 'system_status', 'associations_address',
'(?P.*)'])]
output = find([ops], *rs, filter_=False, all_keys=True)
if not output:
self.builtin.fail("No synchronized server could be found! Was "
"expected '{}' to be synchronized".format(server))
if not output[0][0] == server:
self.builtin.fail("Expected synchronized server to be '{}', but "
"found '{}'".format(server, output[0][0]))
try:
out = device.parse('show mpls forwarding-table {}'.format(ip))
except SchemaEmptyParserError:
log.info("Device output is empty.")
result = False
timeout.sleep()
continue
reqs = R(['vrf', '(.*)',
'local_label', '(?P.*)',
'outgoing_label_or_vc', '(?P.*)',
'prefix_or_tunnel_id', '(?P.*)',
'outgoing_interface', '(?P.*)',
'next_hop', '(?P.*)'])
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(reqs=reqs.args, ret_num={},
source=found, all_keys=True)
for route in keys:
if same_as_local:
log.info("Interface {route[interface]} has local label "
"'{route[local_label]}' and outgoing label "
"'{route[outgoing_label]}'".format(route=route))
if str(route['outgoing_label']) != str(route['local_label']):
result = False
else:
log.info("Interface {route[interface]} outgoing label is "
"'{route[outgoing_label]}', exepected to have label "
"'{expected}'".format(route=route, expected=expected_label))
if str(route['outgoing_label']) != str(expected_label):
def verify_ntp_synchronized_alias(self, device, alias=None):
'''Verify that NTP is synchronized on this device
verify NTP is synchronized on device ""
'''
ops = self.genie_ops_on_device_alias('ntp', device, alias)
rs = [R(['info', 'clock_state', 'system_status', 'associations_address',
'(?P.*)'])]
output = find([ops], *rs, filter_=False, all_keys=True)
if not output:
self.builtin.fail("{} does not have NTP synchronized".format(device))
log.info("Getting Affinity bits of interface {intf}".format(intf=interface))
cmd = 'show ip ospf interface {intf}'.format(intf=interface)
try:
out = device.parse(cmd)
except Exception as e:
log.error("Failed to parse '{cmd}': {e}".format(cmd=cmd, e=e))
return None
reqs = R(['vrf','(.*)',
'address_family','(.*)',
'instance','(.*)','areas','(.*)',
'interfaces','(.*)','teapp','(.*)',
'affinity','bits','(?P.*)'])
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(reqs=reqs.args, ret_num={},
source=found, all_keys=True)
bits = keys[0]['bits']
log.info("Get affinity bits '{bits}' on {intf}".format(bits=bits, intf=interface))
return bits
else:
log.error("Failed to get affinity bits on {intf}".format(intf=interface))
return None
route=neighbor_modified,
address_family=address_family,
)
else:
log.error(
"Argument rd and vrf are mutually exclusive, "
"Only rd export or vrf name can be inserted"
)
return False
reqs = R(['instance','default','vrf','(?P.*)',
'address_family',address_family,'prefixes',
neighbor_address,'index','(?P.*)',
'community','(?P.*)'])
found = find([neighbor_output], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(reqs=reqs.args, ret_num={},
source=found, all_keys=True)
for item in keys:
community_found.append(item['community'])
if check_not_match:
if community in community_found:
log.error("Found community {} in the command output"
.format(community))
else:
log.info("No community {} in the command output"
.format(community))
return True
else:
if community in community_found:
log.info("Getting standby slot")
rs = R(
["slot", "(?P.*)", "rp", "(?P.*)", "state", "ok, standby"]
)
timeout = Timeout(max_time=max_time, interval=interval)
while timeout.iterate():
try:
output = device.parse("show platform")
except SchemaEmptyParserError:
timeout.sleep()
continue
ret = find([output], rs, filter_=False, all_keys=True)
if ret:
standby_rp = ret[0][1][1]
srp = re.search("(?P(\d))", standby_rp).groupdict()["srp"]
if srp:
log.info(
"Standby RP on '{dev}' is: '{standby_rp}'".format(
dev=device.name, standby_rp=standby_rp
)
)
return srp
timeout.sleep()
return None