Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
console.setLevel(logging.ERROR)
logging.getLogger('').addHandler(console)
load_questions()
bf_init_snapshot(args.candidate, name='candidate')
bf_init_snapshot(args.failure, name='failure')
bf_set_snapshot('candidate')
csFailed = test_config_sanity(False)
cpFailed = test_controlplane(False)
dpFailed = test_dataplane(False, fromNode='leaf-3')
logging.info("\nProgress: analysing failure conditions")
bf_set_snapshot('failure')
dpFailedoutage = test_dataplane(False, fromNode='leaf-3')
rr = bfq.differentialReachability().answer(snapshot='candidate', reference_snapshot='failure')
print_reduced_rechability(rr.get('answerElements')[0])
return 0 if not any([cpFailed, dpFailed, csFailed, dpFailedoutage]) else 1
def test_controlplane(isFailed):
# Define a list of Spine switches
spines = set(bfq.nodeProperties(nodes='spine.*').answer().frame()['Node'])
logging.info("Progress: Analyzing control plane properties")
# Get all BGP session status for leaf nodes
bgp = bfq.bgpSessionStatus(nodes='leaf.*').answer().frame()
# All leaves should have at least one peering with each spine
violators = bgp.groupby('Node').filter(lambda x: set(x['Remote_Node']).difference(spines) != set([]))
if len(violators) > 0:
logging.error("Found leaves that do not have at least one peering to each spine")
logging.error(violators[['Node', 'Remote_Node']])
isFailed = True
else:
logging.info("All leaves have at least one peering with each spine")
# All leaves should only peer with spines
non_spines = bgp[~bgp['Remote_Node'].str.contains('spine', na=False)]
if len(non_spines) > 0:
logging.error("Leaves do not only peer with spines")
logging.error(non_spines[['Node', 'Remote_Node']])
isFailed = True
def test_dataplane(isFailed, fromNode, checkMultipath=True):
ips = bfq.ipOwners().answer().frame()
loopbacks = ips[(ips['Interface'] == 'Loopback0') & (ips['Active'])]
localIP = loopbacks[loopbacks['Node'] == fromNode]['IP'][0]
leaves = set(loopbacks[loopbacks['Node'].str.contains('leaf')]['IP'])
leaves.remove(localIP)
spines = set(loopbacks[loopbacks['Node'].str.contains('spine')]['IP'])
mpath = len(spines)
logging.info("Progress: Analyzing traceroute from Leaf-3 to Leaf-1 and Leaf-2")
# Set up the resulting data structure
troute = dict()
for leaf in leaves:
troute[leaf] = dict()
# Build headers for traceroute flows
for leaf in leaves:
def test_config_sanity(isFailed):
logging.info("Progress: Searching for unused and undefined data structures")
# Find all undefined data structures
undefined = bfq.undefinedReferences().answer().frame()
if len(undefined) > 0:
logging.error("Found undefined data structures")
logging.error(undefined)
isFailed = True
else:
logging.info("No undefined data structures found")
# Find all unused data structures
unused = bfq.unusedStructures().answer().frame()
if len(unused) > 0:
logging.error("Found unused data structures")
logging.error(unused)
isFailed = True
else:
logging.info("No unused data structures found")
spines = set(loopbacks[loopbacks['Node'].str.contains('spine')]['IP'])
mpath = len(spines)
logging.info("Progress: Analyzing traceroute from Leaf-3 to Leaf-1 and Leaf-2")
# Set up the resulting data structure
troute = dict()
for leaf in leaves:
troute[leaf] = dict()
# Build headers for traceroute flows
for leaf in leaves:
troute[leaf]['header'] = header(srcIps=localIP,dstIps=leaf)
# Ask questions about traceroute
for leaf,data in troute.items():
troute[leaf]['trace'] = bfq.traceroute(startLocation=fromNode, headers=data['header']).answer()
# Get first flow disposition for traces
for leaf,data in troute.items():
troute[leaf]['result'] = data['trace'].get('answerElements')[0]['rows'][0]['Traces'][0]['disposition']
# Get traceroute paths to reach Leaf-1 and Leaf-2
for leaf,data in troute.items():
troute[leaf]['paths'] = data['trace'].get('answerElements')[0]['rows'][0]['Traces']
# Get traceroute hops to reach Leaf-1 and Leaf-2
for leaf,data in troute.items():
troute[leaf]['hops'] = data['trace'].get('answerElements')[0]['rows'][0]['Traces'][0].get('hops',[])
# Now let's check that the traceroute behaves as we expect
for leaf, data in troute.items():
if data['result'] != 'ACCEPTED':
def test_config_sanity(isFailed):
logging.info("Progress: Searching for unused and undefined data structures")
# Find all undefined data structures
undefined = bfq.undefinedReferences().answer().frame()
if len(undefined) > 0:
logging.error("Found undefined data structures")
logging.error(undefined)
isFailed = True
else:
logging.info("No undefined data structures found")
# Find all unused data structures
unused = bfq.unusedStructures().answer().frame()
if len(unused) > 0:
logging.error("Found unused data structures")
logging.error(unused)
isFailed = True
else:
logging.info("No unused data structures found")
return isFailed
def test_controlplane(isFailed):
# Define a list of Spine switches
spines = set(bfq.nodeProperties(nodes='spine.*').answer().frame()['Node'])
logging.info("Progress: Analyzing control plane properties")
# Get all BGP session status for leaf nodes
bgp = bfq.bgpSessionStatus(nodes='leaf.*').answer().frame()
# All leaves should have at least one peering with each spine
violators = bgp.groupby('Node').filter(lambda x: set(x['Remote_Node']).difference(spines) != set([]))
if len(violators) > 0:
logging.error("Found leaves that do not have at least one peering to each spine")
logging.error(violators[['Node', 'Remote_Node']])
isFailed = True
else:
logging.info("All leaves have at least one peering with each spine")
# All leaves should only peer with spines
non_spines = bgp[~bgp['Remote_Node'].str.contains('spine', na=False)]
def test_get_facts_questions():
"""Test that get facts calls the right questions, passing through the right args."""
bf = Session(load_questions=False)
nodes = 'foo'
with patch.object(bf.q,
'nodeProperties',
create=True) as mock_node, \
patch.object(bf.q,
'interfaceProperties',
create=True) as mock_iface, \
patch.object(bf.q,
'bgpPeerConfiguration',
create=True) as mock_peers, \
patch.object(bf.q,
'bgpProcessConfiguration',
create=True) as mock_proc:
mock_node.return_value = MockQuestion(MockTableAnswer())
mock_iface.return_value = MockQuestion(MockTableAnswer())
mock_proc.return_value = MockQuestion(MockTableAnswer())
def test_run_assertion():
"""Confirm running passing assertion results in a passing message."""
bf = Session(load_questions=False)
assertion = {
'name': 'assert_name',
'type': 'assert_no_undefined_references',
}
with patch.object(bf.q,
'undefinedReferences',
create=True) as mock_undef:
mock_undef.return_value = MockQuestion(MockTableAnswer())
assert run_assertion(bf, assertion) == ASSERT_PASS_MESSAGE
def test_run_assertion_fail():
"""Confirm running failing assertion results in a message indicating failure."""
bf = Session(load_questions=False)
assertion = {
'name': 'assert_name',
'type': 'assert_no_undefined_references',
}
with patch.object(bf.q,
'undefinedReferences',
create=True) as mock_undef:
mock_undef.return_value = MockQuestion(
MockTableAnswer(DataFrame.from_records(
[{'Undef': 'something'}])))
result = run_assertion(bf, assertion)
assert ASSERT_PASS_MESSAGE not in result
assert 'Found undefined reference(s), when none were expected' in result