Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
log.info(banner('Find {u} neighbors'))
try:
peer = sorted(configs.get('devices', {}))
peer.remove('uut')
peer = peer[0]
peer_dev = testbed.devices[peer]
except Exception as e:
section.failed('Cannot find peer neighbors')
log.info(
banner(
'Find {u} interfaces connect to neighbor {p}'.format(
u=uut.name,
p=peer.name)))
# find peer connection interfaces
rs = R(['(?P.*)', 'neighbors',
peer, 'port_id', '(?P.*)'])
ret = find([uut.lldp_mapping], rs, filter_=False, all_keys=True)
if ret:
values = GroupKeys.group_keys(
reqs=rs.args, ret_num={}, source=ret, all_keys=True)
else:
section.failed(
'No Peer inerface Found between {s} and {d}'.format(
s=uut.name, d=peer))
# get free(unassigned) interfaces
log.info(
banner(
'Find {u} free connected interfaces from {l}'.format(
u=uut.name,
l=values)))
Args:
device (`obj`): Device object
Returns:
fans (`list`): Fans info
Raises:
None
"""
fans = []
p = re.compile(r"Fan +Speed +(?P.*)%")
try:
out = device.parse("show environment | include Fan")
except (SchemaEmptyParserError, SubCommandFailure) as e:
return fans
reqs = R(
[
"slot",
"(?P.*)",
"sensor",
"(?P.*)",
"state",
"(?P.*)",
]
)
found = find([out], reqs, filter_=False, all_keys=True)
if found:
fans = GroupKeys.group_keys(
reqs=reqs.args, ret_num={}, source=found, all_keys=True
)
for fan in fans:
def get_ospf_interfaces(ops):
'''Get OSPF interfaces by given neighbor'''
# Create R object to contain the required interface ops attributes path
# find any ospf 'up' interface
reqs = [['info', 'vrf', '(?P.*)', 'address_family', '(?P.*)',
'instance', '(?P.*)', 'areas', '(?P.*)', 'interfaces',
'(?P.*)','state', '(?P(dr|bdr|dr_other|point_to_point))'],
['info', 'vrf', '(?P.*)', 'address_family', '(?P.*)',
'instance', '(?P.*)', 'areas', '(?P.*)', 'interfaces',
'(?P.*)', 'cost', '(?P.*)']]
rs = [R(i) for i in reqs]
ret = find([ops], *rs, filter_=False, all_keys=True)
return GroupKeys.group_keys(ret_num={}, source=ret, reqs=reqs, all_keys=True)
qlimit_bytes (`int`): Interface qlimit_bytes
Raises:
None
"""
try:
out = device.parse(
"show platform hardware qfp active infrastructure bqs "
"queue output default interface {interface}".format(
interface=interface
)
)
except SchemaEmptyParserError:
return
reqs = R(
[
interface,
"index",
"(?P.*)",
"software_control_info",
"qlimit_bytes",
"(?P.*)",
]
)
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(
reqs=reqs.args, ret_num={}, source=found, all_keys=True
)
return keys[0]["qlimit"]
else:
interface (`str`): Interface name
ip_address (`str`): Interface ip address
prefix (`int`): prefix length
vrf (`str`): vrf name
flag (`bool`): True if verify present
False if verify not present
max_time (`int`): max time
check_interval (`int`): check interval
Returns:
result(`bool`): verify result
"""
timeout = Timeout(max_time, check_interval)
if prefix:
ip_address = "{ip}/{prefix}".format(ip=ip_address, prefix=prefix)
reqs = R(
[
"vrf",
"(?P.*)",
"address_family",
"ipv4",
"routes",
ip_address,
"next_hop",
"outgoing_interface",
interface,
"outgoing_interface",
interface,
]
)
while timeout.iterate():
'1/1/6':{'slot': '1',
'subslot': '1 transceiver 6',
'lc': 'ASR1000-SIP10',
'pid': 'SFP-GE-S',
'descr': 'GE SX'}}
Raises:
None
"""
log.info("Getting inventory on {}".format(device.name))
keys = []
try:
out = device.parse("show inventory")
except SchemaEmptyParserError:
return keys
reqs = R(
[
"slot",
"(?P.*)",
"lc",
"(?P.*)",
"subslot",
"(?P.*)",
"(?P.*)",
"descr",
"(?P" + sfp_descr + ")",
]
)
found = find([out], reqs, filter_=False, all_keys=True)
if found:
keys = GroupKeys.group_keys(
Args:
device (`obj`): Device object
system_peer (`str`): System peer ip
Returns:
interface (`str`): Interface name
"""
try:
out = device.parse("show ip cef {}".format(system_peer))
except SchemaEmptyParserError as e:
log.error(
"Command 'show ip cef {}' " "did not return any results".format(system_peer)
)
return None
reqs = R(
[
"vrf",
"(?P.*)",
"address_family",
"(?P.*)",
"prefix",
"(?P.*)",
"nexthop",
"(?P.*)",
"outgoing_interface",
"(?P.*)",
"(?:.*)",
]
)
found = find([out], reqs, filter_=False, all_keys=True)
Returns:
if can filter down to one result:
(('int'): SRGB Base value, ('dict'): Output from parser)
if cannot filter due to lack of arguments:
([{key:value},{key:value}], ('dict'): Output from parser)
Raises:
None
"""
try:
output = device.parse("show ip ospf segment-routing global-block")
except SchemaEmptyParserError:
return None, None
reqs_base = R(
[
"process_id",
process_id,
"routers",
router_id,
'srgb_base',
"(?P.*)",
]
)
found_base = find(output, reqs_base, filter_=False, all_keys=True)
if not found_base:
return None, None
reqs_range = R(
[
"process_id",
process_id ('str'): Ospf process_id
router_id ('str'): Which router_id entry to use
Returns:
if can filter down to one result:
(('int'): SRLB Base value, ('dict'): Output from parser)
Raises:
None
"""
try:
output = device.parse("show ip ospf segment-routing local-block")
except SchemaEmptyParserError:
return None, None
reqs_base = R(
[
"instance",
process_id,
"areas",
"(?P<area>.*)",
"router_id",
router_id,
"srlb_base",
"(?P.*)",
]
)
found_base = find(output, reqs_base, filter_=False, all_keys=True)
if not found_base:
return None, None
reqs_range = R(
Args:
device (`obj`): Device object
Returns:
result (`bool`): Verified result
'''
cmd = 'show isis neighbors'
timeout = Timeout(max_time, check_interval)
while timeout.iterate():
try:
out = device.parse(cmd)
except Exception:
return True
reqs = R(['isis', '(.*)',
'vrf', '(.*)', '(?P.*)'])
found = find([out], reqs, filter_=False, all_keys=True)
if found and not found[0][0]:
return True
timeout.sleep()
return True