Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = utils.load_folder_files(path)
testcases_list = load_testsets_by_path(files_list)
elif os.path.isfile(path):
try:
testset = load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
testcases_cache_mapping[path] = testcases_list
return testcases_list
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
api_files = utils.load_folder_files(api_def_folder)
for test_file in api_files:
testset = load_test_file(test_file)
test_def_overall_dict["api"].update(testset["api"])
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
suite_files = utils.load_folder_files(suite_def_folder)
for suite_file in suite_files:
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
test_def_overall_dict["suite"][function_meta["func_name"]] = suite
"person": {
"name": {
"first_name": "Leo",
"last_name": "Lee",
},
"age": 29,
"cities": ["Guangzhou", "Shenzhen"]
}
}
@param (str) query
"person.name.first_name" => "Leo"
"person.cities.0" => "Guangzhou"
@return queried result
"""
if json_content == "":
raise exception.ResponseError("response content is empty!")
try:
for key in query.split(delimiter):
if isinstance(json_content, list):
json_content = json_content[int(key)]
elif isinstance(json_content, (dict, CaseInsensitiveDict)):
json_content = json_content[key]
else:
raise exception.ParseResponseError(
"response content is in text format! failed to query key {}!".format(key))
except (KeyError, ValueError, IndexError):
raise exception.ParseResponseError("failed to query json when extracting response!")
return json_content
logger.log_error(err_msg)
raise exception.ParamsError(err_msg)
if sub_query:
if not isinstance(top_query_content, (dict, CaseInsensitiveDict, list)):
try:
# TODO: remove compatibility for content, text
if isinstance(top_query_content, bytes):
top_query_content = top_query_content.decode("utf-8")
top_query_content = json.loads(top_query_content)
except json.decoder.JSONDecodeError:
err_msg = u"Failed to extract data with delimiter!\n"
err_msg += u"response content: {}\n".format(self.content)
err_msg += u"regex: {}\n".format(field)
logger.log_error(err_msg)
raise exception.ParamsError(err_msg)
# e.g. key: resp_headers_content_type, sub_query = "content-type"
return utils.query_json(top_query_content, sub_query)
else:
# e.g. key: resp_status_code, resp_content
return top_query_content
except AttributeError:
err_msg = u"Failed to extract value from response!\n"
err_msg += u"response content: {}\n".format(self.content)
err_msg += u"extract field: {}\n".format(field)
logger.log_error(err_msg)
raise exception.ParamsError(err_msg)
def override_variables_binds(variables, new_mapping):
""" convert variables in testcase to ordered mapping, with new_mapping overrided
"""
if isinstance(variables, list):
variables_ordered_dict = convert_to_order_dict(variables)
elif isinstance(variables, (OrderedDict, dict)):
variables_ordered_dict = variables
else:
raise exception.ParamsError("variables error!")
return update_ordered_dict(
variables_ordered_dict,
new_mapping
)
# 1, variable reference, e.g. $token
# 2, function reference, e.g. ${is_status_code_200($status_code)}
# 3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
# 4, string joined by delimiter. e.g. "status_code", "headers.content-type"
# 5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
if isinstance(check_item, (dict, list)) \
or testcase.extract_variables(check_item) \
or testcase.extract_functions(check_item):
# format 1/2/3
check_value = self.eval_content(check_item)
else:
try:
# format 4/5
check_value = resp_obj.extract_field(check_item)
except exception.ParseResponseError:
msg = "failed to extract check item from response!\n"
msg += "response content: {}".format(resp_obj.content)
raise exception.ParseResponseError(msg)
validator["check_value"] = check_value
# expect_value should only be in 2 types:
# 1, variable reference, e.g. $expect_status_code
# 2, actual value, e.g. 200
expect_value = self.eval_content(validator["expect"])
validator["expect"] = expect_value
validator["check_result"] = "unchecked"
return validator
if os.path.isfile(target_file):
imported_module = get_imported_module_from_file(target_file)
items_dict = filter_module(imported_module, item_type)
if item_name in items_dict:
return items_dict[item_name]
else:
return search_conf_item(dir_path, item_type, item_name)
if dir_path == start_path:
# system root path
err_msg = "{} not found in recursive upward path!".format(item_name)
if item_type == "function":
raise exception.FunctionNotFound(err_msg)
else:
raise exception.VariableNotFound(err_msg)
return search_conf_item(dir_path, item_type, item_name)
testcase_parametered_variables_list = self._get_parametered_variables(
testcase_dict.get("variables", []),
testcase_dict.get("parameters", [])
)
for testcase_variables in testcase_parametered_variables_list:
testcase_dict["variables"] = testcase_variables
# eval testcase name with bind variables
variables = utils.override_variables_binds(
config_variables,
testcase_variables
)
self.testcase_parser.update_binded_variables(variables)
try:
testcase_name = self.testcase_parser.eval_content_with_bindings(testcase_dict["name"])
except (AssertionError, exception.ParamsError):
logger.log_warning("failed to eval testcase name: {}".format(testcase_dict["name"]))
testcase_name = testcase_dict["name"]
self.test_runner_list.append((test_runner, variables))
self._add_test_to_suite(testcase_name, test_runner, testcase_dict)
def run(self):
for test_suite in self.test_suite_list:
for test in test_suite:
try:
test.runTest()
except exception.MyBaseError as ex:
from locust.events import request_failure
request_failure.fire(
request_type=test.testcase_dict.get("request", {}).get("method"),
name=test.testcase_dict.get("request", {}).get("url"),
response_time=0,
exception=ex
)