Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# let's check empty files list filtration..
self.obj.engine.config.get(EXEC)[0]["files"] = []
cloud_config = self.obj.prepare_cloud_config()
self.assertIsNone(cloud_config.get("cli", None), None)
self.assertIsNone(cloud_config.get("cli-aliases", None), None)
cloud_execution = cloud_config.get(EXEC)[0]
cloud_modules = cloud_config.get("modules")
target_execution = {ScenarioExecutor.CONCURR: 33, "scenario": "sc1", "executor": "jmeter"}
target_modules = {"jmeter": {"just_option": "just_value"}}
self.assertEqual(cloud_execution, BetterDict.from_dict(target_execution))
self.assertEqual(cloud_modules, BetterDict.from_dict(target_modules))
def test_long_kpi(self):
obj = FinalStatus()
obj.engine = EngineEmul()
obj.parameters = BetterDict.from_dict({"dump-xml": obj.engine.create_artifact("status", ".xml")})
datapoint = random_datapoint(time.time())
datapoint[datapoint.CUMULATIVE][""]["stdev_rt"] = long(0)
obj.aggregated_second(datapoint)
obj.startup()
obj.shutdown()
obj.post_process()
def test_distributed_props(self):
self.sniff_log(self.obj.log)
self.configure({"execution":{"scenario": {"script": RESOURCES_DIR + "/jmeter/jmx/http.jmx"}}})
self.obj.distributed_servers = ["127.0.0.1", "127.0.0.1"]
self.obj.settings['properties'] = BetterDict.from_dict({"a": 1})
self.obj.prepare()
self.obj.startup()
self.assertIn("', '-G', '", self.log_recorder.debug_buff.getvalue())
def test_prepare_no_filename_in_settings(self):
obj = JUnitXMLReporter()
obj.engine = EngineEmul()
obj.parameters = BetterDict.from_dict({"data-source": "sample-labels"})
obj.prepare()
datapoint = DataPoint(0, [])
cumul_data = KPISet.from_dict({
KPISet.AVG_CONN_TIME: 7.890211417203362e-06,
KPISet.RESP_TIMES: Counter({
0.0: 32160, 0.001: 24919, 0.002: 1049, 0.003: 630, 0.004: 224, 0.005: 125,
0.006: 73, 0.007: 46, 0.008: 32, 0.009: 20, 0.011: 8, 0.01: 8, 0.017: 3,
0.016: 3, 0.014: 3, 0.013: 3, 0.04: 2, 0.012: 2, 0.079: 1, 0.081: 1,
0.019: 1, 0.015: 1
}),
KPISet.ERRORS: [{'msg': 'Forbidden', 'cnt': 7373, 'type': 0,
'urls': Counter({'http://192.168.25.8/': 7373}), KPISet.RESP_CODES: '403'}],
KPISet.STDEV_RESP_TIME: 0.04947974228872108,
KPISet.AVG_LATENCY: 0.0002825639815220692,
if os.path.isfile(source):
result_list.append(source)
else: # source is dir
self.log.debug("Compress directory '%s'", source)
base_dir_name = os.path.basename(source)
zip_name = self.engine.create_artifact(base_dir_name, '.zip')
relative_prefix_len = len(os.path.dirname(source))
with zipfile.ZipFile(zip_name, 'w') as zip_file:
for _file in get_files_recursive(source):
zip_file.write(_file, _file[relative_prefix_len:])
result_list.append(zip_name)
packed_list.append(base_dir_name + '.zip')
if packed_list:
services = self.engine.config.get(Service.SERV, [], force_set=True)
unpacker = BetterDict.from_dict({'module': Unpacker.UNPACK, Unpacker.FILES: packed_list, 'run-at': 'local'})
services.append(unpacker)
return result_list
converter = SoapUIScriptConverter(self.log)
conv_config = converter.convert_script(script_path)
conv_scenarios = conv_config["scenarios"]
scenario_name, conv_scenario = converter.find_soapui_test_case(test_case, conv_scenarios)
new_name = scenario_name
counter = 1
while new_name in self.engine.config["scenarios"]:
new_name = scenario_name + ("-%s" % counter)
counter += 1
if new_name != scenario_name:
self.log.info("Scenario name '%s' is already taken, renaming to '%s'", scenario_name, new_name)
scenario_name = new_name
merged_scenario = BetterDict.from_dict(conv_scenario)
merged_scenario.merge(base_scenario.data)
for field in [Scenario.SCRIPT, "test-case"]:
if field in merged_scenario:
merged_scenario.pop(field)
return scenario_name, merged_scenario
def _add_jar_tool(self, req_tool_class, **kwargs):
# todo: it's for backward compatibility only, remove it later
if "local_path" in kwargs:
local_path = kwargs.pop("local_path")
if local_path:
kwargs["config"] = BetterDict.from_dict({"config": local_path})
req_tool = self._get_tool(req_tool_class, **kwargs)
self._tools.append(req_tool)
self.class_path.append(req_tool.tool_path)
def _extract_test_case(self, test_case, test_suite, suite_level_props):
case_name = test_case.get("name")
scenario_name = test_suite.get("name") + "-" + case_name
case_properties = self._extract_properties(test_case)
case_properties = {
"#TestCase#" + key: value
for key, value in iteritems(case_properties)
}
case_level_props = BetterDict.from_dict(suite_level_props)
case_level_props.merge(case_properties)
scenario = self._extract_scenario(test_case, case_level_props)
scenario['test-suite'] = test_suite.get("name")
return scenario_name, scenario
def __ensure_list_type(self, values):
"""
Ensure that values is a list, convert if needed
:param values: dict or list
:return:
"""
for idx, obj in enumerate(values):
if isinstance(obj, dict):
values[idx] = BetterDict.from_dict(obj)
elif isinstance(obj, list):
self.__ensure_list_type(obj)
def _get_scenario_label(self, name, scenarios):
if name is None: # get current scenario
exc = TaurusConfigError("Scenario is not found in execution: %s" % self.execution)
label = self.execution.get('scenario', exc)
is_script = isinstance(label, string_types) and label not in scenarios and \
os.path.exists(self.engine.find_file(label))
if isinstance(label, list):
msg = "Invalid content of scenario, list type instead of dict or string: %s"
raise TaurusConfigError(msg % label)
if isinstance(label, dict) or is_script:
self.log.debug("Extract %s into scenarios" % label)
if isinstance(label, string_types):
scenario = BetterDict.from_dict({Scenario.SCRIPT: label})
else:
scenario = label
path = self.get_script_path(scenario=Scenario(self.engine, scenario))
if path:
label = os.path.basename(path)
if not path or label in scenarios:
hash_str = str(hashlib.md5(to_json(scenario).encode()).hexdigest())
label = 'autogenerated_' + hash_str[-10:]
scenarios[label] = scenario
self.execution['scenario'] = label
self.label = label
else: # get scenario by name
label = name