Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_xml_format_passfail(self):
obj = JUnitXMLReporter()
obj.engine = EngineEmul()
obj.parameters = BetterDict()
obj.engine.provisioning = CloudProvisioning()
obj.engine.provisioning.results_url = "http://test/report/123"
pass_fail1 = CriteriaProcessor([], None)
crit_cfg1 = BetterDict()
crit_cfg2 = BetterDict()
crit_cfg3 = BetterDict()
crit_cfg4 = BetterDict()
crit_cfg1.merge({
'stop': True, 'label': 'Sample 1 Triggered', 'fail': True,
'timeframe': -1, 'threshold': '150ms', 'condition': '<', 'subject': 'avg-rt'})
crit_cfg2.merge({
'stop': True, 'label': 'Sample 1 Not Triggered', 'fail': True,
'timeframe': -1, 'threshold': '300ms', 'condition': '>', 'subject': 'avg-rt'})
crit_cfg3.merge({
'stop': True, 'label': 'Sample 2 Triggered', 'fail': True, 'timeframe': -1,
'threshold': '150ms', 'condition': '<=', 'subject': 'avg-rt'})
crit_cfg4.merge({
def _merge_and_compare(self, first, second, result):
sample = BetterDict().merge(first)
sample.merge(second)
result = BetterDict().merge(result)
self.assertEqual(sample, result)
def _filter_and_compare(self, first, second, result, black_list=False):
sample = BetterDict().merge(first)
sample.filter(second, black_list=black_list)
result = BetterDict().merge(result)
self.assertEqual(sample, result)
RESOURCES_DIR + "json/merge2.json",
]
obj.load(configs)
fname = temp_file()
obj.dump(fname, Configuration.JSON)
with open(fname) as fh:
ROOT_LOGGER.debug("JSON:\n%s", fh.read())
jmeter = obj['modules']['jmeter']
classval = jmeter['class']
self.assertEquals("bzt.modules.jmeter.JMeterExecutor", classval)
self.assertEquals("value", obj['key'])
self.assertEquals(6, len(obj["list-append"]))
self.assertEquals(2, len(obj["list-replace"]))
self.assertEquals(2, len(obj["list-replace-notexistent"]))
self.assertIsInstance(obj["list-complex"][1][0], BetterDict)
self.assertIsInstance(obj["list-complex"][1][0], BetterDict)
self.assertIsInstance(obj["list-complex"][1][0], BetterDict)
self.assertFalse("properties" in jmeter)
fname = temp_file()
obj.dump(fname, Configuration.JSON)
checker = Configuration()
checker.load([fname])
token = checker["list-complex"][1][0]['token']
self.assertNotEquals('test', token)
token_orig = obj["list-complex"][1][0]['token']
self.assertEquals('test', token_orig)
def __init__(self):
super(ScenarioExecutor, self).__init__()
self.env = Environment(log=self.log)
self.provisioning = None
self.execution = BetterDict() # FIXME: why have this field if we have `parameters` from base class?
self._cached_scenario = None
self.label = None
self.widget = None
self.reader = None
self.stdout = None
self.stderr = None
self.delay = None
self.start_time = None
self.preprocess_args = lambda x: None
else:
return self.test_suite + '.' + self.test_case
def get_short_name(self):
if self.path:
return '.'.join(comp["value"] for comp in self.path[-2:])
else:
return self.test_suite + '.' + self.test_case
def get_type(self):
if self.path:
return self.path[-1]["type"]
return None
class ResultsTree(BetterDict):
def __init__(self):
super(ResultsTree, self).__init__()
def add_sample(self, sample):
"""
:type sample: FunctionalSample
"""
test_suite = sample.test_suite
self.get(test_suite, [], force_set=True).append(sample)
def test_suites(self):
return [key for key, _ in iteritems(self)]
def test_cases(self, suite_name):
return self.get(suite_name, [])
def get_kpi_body(self, data_buffer, is_final):
# - reporting format:
# {labels: <data>, # see below
# sourceID: ,
# [is_final: True]} # for last report
#
# - elements of 'data' are described in __get_label()
#
# - elements of 'intervals' are described in __get_interval()
# every interval contains info about response codes have gotten on it.
report_items = BetterDict()
if data_buffer:
self.owner.first_ts = min(self.owner.first_ts, data_buffer[0][DataPoint.TIMESTAMP])
self.owner.last_ts = max(self.owner.last_ts, data_buffer[-1][DataPoint.TIMESTAMP])
# following data is received in the cumulative way
for label, kpi_set in iteritems(data_buffer[-1][DataPoint.CUMULATIVE]):
report_item = self.__get_label(label, kpi_set)
self.__add_errors(report_item, kpi_set) # 'Errors' tab
report_items[label] = report_item
# fill 'Timeline Report' tab with intervals data
# intervals are received in the additive way
for dpoint in data_buffer:
time_stamp = dpoint[DataPoint.TIMESTAMP]
for label, kpi_set in iteritems(dpoint[DataPoint.CURRENT]):
exc = TaurusInternalException('Cumulative KPISet is non-consistent')</data>
def _extract_request(self, path, path_obj, method, operation):
request = {}
if method != "get":
request["method"] = method.upper()
if operation.operation_id is not None:
request["label"] = operation.operation_id
parameters = BetterDict()
if path_obj.parameters:
parameters.merge(path_obj.parameters)
if operation.parameters:
parameters.merge(operation.parameters)
query_params, form_data, request_body, headers = self._handle_parameters(parameters)
if headers:
request["headers"] = headers
if form_data and request_body:
self.log.warning("Both form data and request body are specified. Omitting form data")
if request_body:
request["body"] = request_body
elif form_data:
def string_to_config(crit_config):
"""
Parse string like "avg-rt of label>100ms for 1m, continue as non-failed"
into config dict
:type crit_config: str
:rtype: dict
"""
res = BetterDict.from_dict({
"subject": None,
"condition": None,
"threshold": None,
"logic": "for",
"timeframe": 0,
"label": "",
"stop": True,
"fail": True,
"message": None,
})
if ':' in crit_config:
res['message'] = crit_config[:crit_config.index(':')].strip()
crit_config = crit_config[crit_config.index(':') + 1:].strip()
if ',' in crit_config:
def _extract_transfer(self, transfer):
source_type = transfer.findtext('./con:sourceType', namespaces=self.NAMESPACES)
source_step_name = transfer.findtext('./con:sourceStep', namespaces=self.NAMESPACES)
query = transfer.findtext('./con:sourcePath', namespaces=self.NAMESPACES)
transfer_type = transfer.findtext('./con:type', namespaces=self.NAMESPACES)
target_step_name = transfer.findtext('./con:targetStep', namespaces=self.NAMESPACES)
target_prop = transfer.findtext('./con:targetType', namespaces=self.NAMESPACES)
if source_step_name.startswith("#") and source_step_name.endswith("#"):
source_step_name = source_step_name[1:-1]
if not self._validate_transfer(source_type, source_step_name, transfer_type, target_step_name):
return None
extractor = BetterDict()
if transfer_type == "JSONPATH":
extractor.merge({
'extract-jsonpath': {
target_prop: {
'jsonpath': query,
'default': 'NOT_FOUND',
}
}
})
elif transfer_type == "XPATH":
extractor.merge({
'extract-xpath': {
target_prop: {
'xpath': query,
'default': 'NOT_FOUND',
}