Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
- (dict) testset_dict
- (list) list of testset_dict
[
testset_dict_1,
testset_dict_2
]
mapping (dict): if mapping specified, it will override variables in config block.
Returns:
instance: HttpRunner() instance
"""
try:
test_suite_list = init_test_suites(path_or_testcases, mapping)
except exceptions.TestcaseNotFound:
logger.log_error("Testcases not found in {}".format(path_or_testcases))
sys.exit(1)
self.summary = {
"success": True,
"stat": {},
"time": {},
"platform": get_platform(),
"details": []
}
def accumulate_stat(origin_stat, new_stat):
"""accumulate new_stat to origin_stat."""
for key in new_stat:
if key not in origin_stat:
origin_stat[key] = new_stat[key]
elif key == "start_at":
return getattr(self, top_query)
# cookies
elif top_query == "cookies":
cookies = self.cookies
if not sub_query:
# extract cookies
return cookies
try:
return cookies[sub_query]
except KeyError:
err_msg = u"Failed to extract cookie! => {}\n".format(field)
err_msg += u"response cookies: {}\n".format(cookies)
logger.log_error(err_msg)
raise exceptions.ExtractFailure(err_msg)
# elapsed
elif top_query == "elapsed":
available_attributes = u"available attributes: days, seconds, microseconds, total_seconds"
if not sub_query:
err_msg = u"elapsed is datetime.timedelta instance, attribute should also be specified!\n"
err_msg += available_attributes
logger.log_error(err_msg)
raise exceptions.ParamsError(err_msg)
elif sub_query in ["days", "seconds", "microseconds"]:
return getattr(self.elapsed, sub_query)
elif sub_query == "total_seconds":
return self.elapsed.total_seconds()
else:
err_msg = "{} is not valid datetime.timedelta attribute.\n".format(sub_query)
# get the length of the content, but if the argument stream is set to True, we take
# the size from the content-length header, in order to not trigger fetching of the body
if kwargs.get("stream", False):
self.meta_data["response"]["content_size"] = int(self.meta_data["response"]["headers"].get("content-length") or 0)
else:
self.meta_data["response"]["content_size"] = len(response.content or "")
# log response details in debug mode
log_print("response")
try:
response.raise_for_status()
except RequestException as e:
logger.log_error(u"{exception}".format(exception=str(e)))
else:
logger.log_info(
"""status_code: {}, response_time(ms): {} ms, response_length: {} bytes""".format(
self.meta_data["response"]["status_code"],
self.meta_data["response"]["response_time_ms"],
self.meta_data["response"]["content_size"]
)
)
return response
def render_html_report(summary, html_report_name=None, html_report_template=None):
""" render html report with specified report name and template
if html_report_name is not specified, use current datetime
if html_report_template is not specified, use default report template
"""
if not html_report_template:
html_report_template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"templates",
"extent_report_template.html"
)
logger.log_debug("No html report template specified, use default.")
else:
logger.log_info("render with html report template: {}".format(html_report_template))
logger.log_info("Start to render Html report ...")
logger.log_debug("render data: {}".format(summary))
report_dir_path = os.path.join(os.getcwd(), "reports")
start_at_timestamp = summary["time"]["start_at"].replace(":", "-")
if html_report_name:
summary["html_report_name"] = html_report_name
report_dir_path = os.path.join(report_dir_path, html_report_name)
html_report_name += "-{}.html".format(start_at_timestamp)
else:
summary["html_report_name"] = ""
html_report_name = "{}.html".format(start_at_timestamp)
"elapsed_ms": response.elapsed.microseconds / 1000.0,
"content_size": content_size
}
# record request and response histories, include 30X redirection
response_list = response.history + [response]
self.meta_data["data"] = [
self.get_req_resp_record(resp_obj)
for resp_obj in response_list
]
self.meta_data["data"][0]["request"].update(kwargs)
try:
response.raise_for_status()
except RequestException as e:
logger.log_error(u"{exception}".format(exception=str(e)))
else:
logger.log_info(
"""status_code: {}, response_time(ms): {} ms, response_length: {} bytes\n""".format(
response.status_code,
response_time_ms,
content_size
)
)
return response
def module_hrun(name, base_url, module, receiver):
"""
异步运行模块
:param env_name: str: 环境地址
:param project: str:项目所属模块
:param module: str:模块名称
:return:
"""
logger.setup_logger('INFO')
kwargs = {
"failfast": False,
}
runner = HttpRunner(**kwargs)
module = list(module)
testcase_dir_path = os.path.join(os.getcwd(), "suite")
testcase_dir_path = os.path.join(testcase_dir_path, get_time_stamp())
try:
for value in module:
run_by_module(value[0], base_url, testcase_dir_path)
except ObjectDoesNotExist:
return '找不到模块信息'
runner.run(testcase_dir_path)
def extract_output(self, output_variables_list):
""" extract output variables
"""
variables_mapping = self.context.testcase_variables_mapping
output = {}
for variable in output_variables_list:
if variable not in variables_mapping:
logger.log_warning(
"variable '{}' can not be found in variables mapping, failed to output!"\
.format(variable)
)
continue
output[variable] = variables_mapping[variable]
return output