Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __add_jsr_elements(children, req):
"""
:type children: etree.Element
:type req: Request
"""
jsrs = req.config.get("jsr223", [])
if not isinstance(jsrs, list):
jsrs = [jsrs]
for idx, _ in enumerate(jsrs):
jsr = ensure_is_dict(jsrs, idx, sub_key='script-text')
lang = jsr.get("language", "groovy")
script_file = jsr.get("script-file", None)
script_text = jsr.get("script-text", None)
if not script_file and not script_text:
raise TaurusConfigError("jsr223 element must specify one of 'script-file' or 'script-text'")
parameters = jsr.get("parameters", "")
execute = jsr.get("execute", "after")
cache_key = str(jsr.get("compile-cache", True)).lower()
children.append(JMX._get_jsr223_element(lang, script_file, parameters, execute, script_text, cache_key))
children.append(etree.Element("hashTree"))
def _gen_xpath_assertions(self, request):
stmts = []
jpath_assertions = request.config.get("assert-xpath", [])
for idx, assertion in enumerate(jpath_assertions):
assertion = ensure_is_dict(jpath_assertions, idx, "xpath")
exc = TaurusConfigError('XPath not found in assertion: %s' % assertion)
query = assertion.get('xpath', exc)
parser_type = 'html' if assertion.get('use-tolerant-parser', True) else 'xml'
validate = assertion.get('validate-xml', False)
method = "assert_not_xpath" if assertion.get('invert', False) else "assert_xpath"
stmts.append(ast.Expr(
ast_call(
func=ast_attr("response.%s" % method),
args=[self._gen_expr(query)],
keywords=[ast.keyword(arg="parser_type", value=self._gen_expr(parser_type)),
ast.keyword(arg="validate", value=self._gen_expr(validate))])))
return stmts
def __add_regexp_ext(self, children, req):
extractors = req.config.get("extract-regexp")
for varname in extractors:
cfg = ensure_is_dict(extractors, varname, "regexp")
scope = cfg.get("scope", None)
from_var = cfg.get("from-variable", None)
extractor = JMX._get_extractor(varname, cfg.get('subject', 'body'), cfg['regexp'], cfg.get('template', 1),
cfg.get('match-no', 1), cfg.get('default', 'NOT_FOUND'), scope, from_var)
children.append(extractor)
children.append(etree.Element("hashTree"))
def _gen_assertions(self, request):
stmts = []
assertions = request.config.get("assert", [])
for idx, assertion in enumerate(assertions):
assertion = ensure_is_dict(assertions, idx, "contains")
if not isinstance(assertion['contains'], list):
assertion['contains'] = [assertion['contains']]
subject = assertion.get("subject", Scenario.FIELD_BODY)
if subject in (Scenario.FIELD_BODY, Scenario.FIELD_HEADERS):
for member in assertion["contains"]:
func_table = {
(Scenario.FIELD_BODY, False, False): "assert_in_body",
(Scenario.FIELD_BODY, False, True): "assert_not_in_body",
(Scenario.FIELD_BODY, True, False): "assert_regex_in_body",
(Scenario.FIELD_BODY, True, True): "assert_regex_not_in_body",
(Scenario.FIELD_HEADERS, False, False): "assert_in_headers",
(Scenario.FIELD_HEADERS, False, True): "assert_not_in_headers",
(Scenario.FIELD_HEADERS, True, False): "assert_regex_in_headers",
(Scenario.FIELD_HEADERS, True, True): "assert_regex_not_in_headers",
}
method = func_table[(subject, assertion.get('regexp', True), assertion.get('not', False))]
def _gen_extractors(self, request):
stmts = []
jextractors = request.config.get("extract-jsonpath")
for varname in jextractors:
cfg = ensure_is_dict(jextractors, varname, "jsonpath")
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_jsonpath"),
args=[self._gen_expr(cfg['jsonpath']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
extractors = request.config.get("extract-regexp")
for varname in extractors:
cfg = ensure_is_dict(extractors, varname, "regexp")
# TODO: support non-'body' value of 'subject'
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_regex"),
args=[self._gen_expr(cfg['regexp']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
def _gen_jsonpath_assertions(self, request):
stmts = []
jpath_assertions = request.config.get("assert-jsonpath", [])
for idx, assertion in enumerate(jpath_assertions):
assertion = ensure_is_dict(jpath_assertions, idx, "jsonpath")
exc = TaurusConfigError('JSON Path not found in assertion: %s' % assertion)
query = assertion.get('jsonpath', exc)
expected = assertion.get('expected-value', None)
method = "assert_not_jsonpath" if assertion.get('invert', False) else "assert_jsonpath"
stmts.append(ast.Expr(
ast_call(
func=ast_attr("response.%s" % method),
args=[self._gen_expr(query)],
keywords=[ast.keyword(arg="expected_value", value=self._gen_expr(expected))])))
return stmts
def get_raw_load(self):
prov_type = self.engine.config.get(Provisioning.PROV)
for param in (ScenarioExecutor.THRPT, ScenarioExecutor.CONCURR):
ensure_is_dict(self.execution, param, prov_type)
throughput = self.execution.get(ScenarioExecutor.THRPT).get(prov_type, None)
concurrency = self.execution.get(ScenarioExecutor.CONCURR).get(prov_type, None)
iterations = self.execution.get("iterations", None)
steps = self.execution.get(ScenarioExecutor.STEPS, None)
hold = self.execution.get(ScenarioExecutor.HOLD_FOR, None)
ramp_up = self.execution.get(ScenarioExecutor.RAMP_UP, None)
return self.LOAD_FMT(concurrency=concurrency, ramp_up=ramp_up, throughput=throughput, hold=hold,
iterations=iterations, duration=None, steps=steps)
extractors = request.config.get("extract-regexp")
for varname in extractors:
cfg = ensure_is_dict(extractors, varname, "regexp")
# TODO: support non-'body' value of 'subject'
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_regex"),
args=[self._gen_expr(cfg['regexp']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
# TODO: css/jquery extractor?
xpath_extractors = request.config.get("extract-xpath")
for varname in xpath_extractors:
cfg = ensure_is_dict(xpath_extractors, varname, "xpath")
parser_type = 'html' if cfg.get('use-tolerant-parser', True) else 'xml'
validate = cfg.get('validate-xml', False)
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_xpath"),
args=[self._gen_expr(cfg['xpath'])],
keywords=[ast.keyword(arg="default", value=cfg.get('default', 'NOT_FOUND')),
ast.keyword(arg="parser_type", value=parser_type),
ast.keyword(arg="validate", value=validate)])))
return stmts
assertions = req.config.get("assert", [])
first_assert = True
if assertions:
statement = 'with self.client.%s(%s, catch_response=True) as response:'
else:
statement = "self.client.%s(%s)"
headers = OrderedDict()
if global_headers:
sorted_headers = OrderedDict(sorted(global_headers.items(), key=lambda t: t[0]))
headers.update(sorted_headers)
if req.headers:
headers.update(req.headers)
task.append(self.gen_statement(statement % (method, self.__get_params_line(req, timeout, headers))))
for idx, assertion in enumerate(assertions):
assertion = ensure_is_dict(assertions, idx, "contains")
if not isinstance(assertion['contains'], list):
assertion['contains'] = [assertion['contains']]
self.__gen_assertion(task, assertion, first_assert)
first_assert = False
if assertions:
task.append(self.gen_statement('else:', indent=12))
task.append(self.gen_statement('response.success()', indent=16))
def _gen_extractors(self, request):
stmts = []
jextractors = request.config.get("extract-jsonpath")
for varname in jextractors:
cfg = ensure_is_dict(jextractors, varname, "jsonpath")
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_jsonpath"),
args=[self._gen_expr(cfg['jsonpath']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
extractors = request.config.get("extract-regexp")
for varname in extractors:
cfg = ensure_is_dict(extractors, varname, "regexp")
# TODO: support non-'body' value of 'subject'
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_regex"),
args=[self._gen_expr(cfg['regexp']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
# TODO: css/jquery extractor?
xpath_extractors = request.config.get("extract-xpath")
for varname in xpath_extractors:
cfg = ensure_is_dict(xpath_extractors, varname, "xpath")
parser_type = 'html' if cfg.get('use-tolerant-parser', True) else 'xml'
validate = cfg.get('validate-xml', False)
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],