Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_current_packet_rate(self, first_sample=False):
# Get the current packet rate
# expect1.1> TGNGetCurrentPacketRate -array test_params
result = tcl.eval('TGNGetCurrentPacketRate -array test_params')
# Process result
if result == '1':
log.info("Successfully got current packet rate on device '{}'".format(self.device.name))
# If this was the first sample of rates, save the timestamp
if first_sample:
fs_timestamp = tcl.eval('clock seconds')
tcl.q.set('test_params(TGN,FirstRateSampleTimeStamp)', fs_timestamp)
else:
raise GenieTgnError("Unable to get current packet rate on device '{}'".format(self.device.name))
def send_arp_on_interface(self):
# Send ARP on interface
# expect1.1> TGNSendArpOnInterface
result = tcl.eval('::psat-ng::TGNSendArpOnInterface')
# Process result
if result == '1':
log.info("Successfully sent ARP on interface on TGN device '{}'".format(self.device.name))
else:
raise GenieTgnError("Unable to send ARP on interface on TGN device '{}'".format(self.device.name))
def start_routing(self):
# Start routing
# expect1.1> TGNRouting
result = tcl.eval('TGNRouting')
# Process result
if result == '1':
log.info("Successfully started routing on device '{}'".format(self.device.name))
else:
raise GenieTgnError("Failed to start routing on device '{}'".format(self.device.name))
def start_traffic(self):
# Start traffic
# expect1.1> TGNTraffic
result = tcl.eval('TGNTraffic')
# Process result
if result == '1':
log.info("Successfully started traffic on device '{}'".format(self.device.name))
else:
raise GenieTgnError("Failed to start traffic on device '{}'".format(self.device.name))
def learn_traffic_streams(self):
# Learn traffic
# expect1.1> LearnTGN
result = tcl.eval('LearnTGN')
# Process result
if result == '1':
log.info("Successfully learned traffic streams on device '{}'".format(self.device.name))
else:
raise GenieTgnError("Unable to learn traffic streams on device '{}'".format(self.device.name))
def poll_traffic_until_traffic_resumes(self, timeout=60, delay_check_traffic=10):
# 'timeout' value in seconds [default 60 seconds]
# 'delay' value in seconds [default 10 seconds]
# Convert from seconds to milliseconds for PSAT TGN
delay_check_traffic = delay_check_traffic * 1000
# expect1.1> TGNPollUntilTrafficResumes -timeout $maxOutageTime -delayCheckTraffic $delayCheckTraffic
result = tcl.eval('TGNPollUntilTrafficResumes -timeout {timeout}'
' -delayCheckTraffic {delay}'.\
format(timeout=timeout, delay=delay_check_traffic))
# Process result
if result != '1':
raise GenieTgnError("Traffic failure observed on device '{}'".\
format(self.device.name))
def get_reference_packet_rate(self):
# Get the referecne packet rate
# expect1.1> TGNGetReferencePacketRate -maxMinutes $test_params(TGNWaitForRefRateMinutes)
result = tcl.eval('TGNGetReferencePacketRate -maxMinutes $test_params(TGNWaitForRefRateMinutes)')
# Process result
if result == '1':
log.info("Successfully got reference packet rate on device '{}'".format(self.device.name))
else:
raise GenieTgnError("Unable to get reference packet rate on device '{}'".format(self.device.name))
else:
raise Exception("env(ENA_TESTS) not set or its path does not exist")
# Check for required env vars
required_env_vars = ['AUTOTEST', 'ATS_EASY', 'IXIA_HOME', 'IXIA_VERSION',
'IXIA_HLTAPI_LIBRARY', 'XBU_SHARED', 'TCLLIBPATH',
'TCL_PKG_PREFER_LATEST']
for env_var in required_env_vars:
if env_var not in os.environ:
raise Exception("env({}) has not been set".format(env_var))
# Required TCL Packages
tcl.q.package('require', 'cAAs')
tcl.q.package('require', 'psat-ng')
tcl.q.package('require', 'enaTgnUtils')
tcl.eval('namespace import -force ::enaTgnUtils::*')
# Source PSAT lib files
try:
tcl.q.source(psat_tgn)
tcl.q.source(psat_lib)
except:
raise Exception("Unable to source PSAT files required for TGN")
# Get arguments from TGN testbed YAML file
try:
address = self.connection_info['address']
controller = self.connection_info['controller']
handle = self.connection_info['handle']
tgn_type = self.connection_info['tgn_type']
except KeyError as e:
raise Exception('Argument not provided in TGN YAML file') from e
def clear_stats(self):
# Clear TGN statistics
# expect1.1> TGNClearAllStats
result = tcl.eval('TGNClearAllStats')
# Process result
if result == '1':
log.info("Successfully cleared statistics on device '{}'".format(self.device.name))
else:
raise GenieTgnError("Unable to clear statistics device on '{}'".format(self.device.name))