Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert_message = "'%s' " % val
if not reverse:
assert_message += 'not '
assert_message += 'found in BODY'
if regexp:
if reverse:
method = "self.assertEqual"
else:
method = "self.assertNotEqual"
assertion_elements.append(
ast.Assign(
targets=[ast.Name(id="re_pattern")],
value=ast_call(
func=ast_attr("re.compile"),
args=[ast.Str(val)])))
assertion_elements.append(ast.Expr(
ast_call(
func=ast_attr(method),
args=[
ast.Num(0),
ast_call(
func=ast.Name(id="len"),
args=[ast_call(
func=ast_attr("re.findall"),
args=[ast.Name(id="re_pattern"), ast.Name(id="body")])]),
ast.Str("Assertion: %s" % assert_message)])))
else:
if reverse:
def _gen_extractors(self, request):
stmts = []
jextractors = request.config.get("extract-jsonpath")
for varname in jextractors:
cfg = ensure_is_dict(jextractors, varname, "jsonpath")
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_jsonpath"),
args=[self._gen_expr(cfg['jsonpath']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
extractors = request.config.get("extract-regexp")
for varname in extractors:
cfg = ensure_is_dict(extractors, varname, "regexp")
# TODO: support non-'body' value of 'subject'
stmts.append(ast.Assign(
targets=[self.expr_compiler.gen_var_accessor(varname, ast.Store())],
value=ast_call(
func=ast_attr("response.extract_regex"),
args=[self._gen_expr(cfg['regexp']), self._gen_expr(cfg.get('default', 'NOT_FOUND'))])))
# TODO: css/jquery extractor?
xpath_extractors = request.config.get("extract-xpath")
for varname in xpath_extractors:
if subject in (Scenario.FIELD_BODY, Scenario.FIELD_HEADERS):
for member in assertion["contains"]:
func_table = {
(Scenario.FIELD_BODY, False, False): "assert_in_body",
(Scenario.FIELD_BODY, False, True): "assert_not_in_body",
(Scenario.FIELD_BODY, True, False): "assert_regex_in_body",
(Scenario.FIELD_BODY, True, True): "assert_regex_not_in_body",
(Scenario.FIELD_HEADERS, False, False): "assert_in_headers",
(Scenario.FIELD_HEADERS, False, True): "assert_not_in_headers",
(Scenario.FIELD_HEADERS, True, False): "assert_regex_in_headers",
(Scenario.FIELD_HEADERS, True, True): "assert_regex_not_in_headers",
}
method = func_table[(subject, assertion.get('regexp', True), assertion.get('not', False))]
stmts.append(ast.Expr(
ast_call(
func=ast_attr("response.%s" % method),
args=[self._gen_expr(member)])))
elif subject == Scenario.FIELD_RESP_CODE:
for member in assertion["contains"]:
method = "assert_status_code" if not assertion.get('not', False) else "assert_not_status_code"
stmts.append(ast.Expr(
ast_call(
func=ast_attr("response.%s" % method),
args=[self._gen_expr(member)])))
return stmts
def _gen_locator(self, tag, selector):
return ast_call(
func=ast_attr("self.driver.find_element"),
args=[
ast_attr("By.%s" % self.BYS[tag]),
self._gen_expr(selector)])
ast_call(func=ast_attr("options.set_headless")))]
body = [ast.Assign(targets=[ast_attr("self.driver")], value=ast_attr("None"))]
if browser == 'firefox':
body.append(ast.Assign(
targets=[ast.Name(id="options")],
value=ast_call(
func=ast_attr("webdriver.FirefoxOptions"))))
body.extend(headless_setup)
body.append(ast.Assign(
targets=[ast.Name(id="profile")],
value=ast_call(func=ast_attr("webdriver.FirefoxProfile"))))
body.append(ast.Expr(
ast_call(
func=ast_attr("profile.set_preference"),
args=[ast.Str("webdriver.log.file"), ast.Str(self.wdlog)])))
body.append(ast.Assign(
targets=[ast_attr("self.driver")],
value=ast_call(
func=ast_attr("webdriver.Firefox"),
args=[ast.Name(id="profile")],
keywords=[ast.keyword(
arg="firefox_options",
value=ast.Name(id="options"))])))
elif browser == 'chrome':
body.append(ast.Assign(
targets=[ast.Name(id="options")],
value=ast_call(
func=ast_attr("webdriver.ChromeOptions"))))
body.append(ast.Expr(
def _gen_select_mngr(self, param, selectors):
elements = [self._gen_get_locators("var_loc_select", selectors), ast_call(
func=ast_attr(
fields=(
ast_call(func="Select", args=[self._gen_dynamic_locator("var_loc_select")]),
"select_by_visible_text")),
args=[self._gen_expr(param)])]
return elements
ast_call(
func=ast_attr("self.driver.implicitly_wait"),
args=[ast.Num(self._get_scenario_timeout())])))
mgr = "WindowManager"
if mgr in self.selenium_extras:
body.append(ast.Assign(
targets=[ast_attr("self.wnd_mng")],
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver")])))
mgr = "FrameManager"
if mgr in self.selenium_extras:
body.append(ast.Assign(
targets=[ast_attr("self.frm_mng")],
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver")])))
self.selenium_extras.add("LocatorsManager")
mgr = "LocatorsManager"
body.append(ast.Assign(
targets=[ast_attr("self.loc_mng")],
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver"), ast.Str(self._get_scenario_timeout())])))
return body
func=ast_attr("reader_%s.get_vars" % (idx + 1)))])
data_sources.append(ast.Expr(extend_vars))
handlers = []
if self.generate_markers:
func_name = "add_flow_markers"
self.selenium_extras.add(func_name)
handlers.append(ast.Expr(ast_call(func=func_name)))
stored_vars = {"func_mode": str(False)} # todo: make func_mode optional
if target_init:
if self.test_mode == "selenium":
stored_vars["driver"] = "self.driver"
store_call = ast_call(
func=ast_attr("apiritif.put_into_thread_store"),
keywords=[ast.keyword(arg=key, value=ast_attr(stored_vars[key])) for key in stored_vars],
args=[])
store_block = [ast.Expr(store_call)]
setup = ast.FunctionDef(
name="setUp",
args=[ast_attr("self")],
body=target_init + data_sources + handlers + store_block,
decorator_list=[])
return [setup, gen_empty_line_stmt()]
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver")])))
mgr = "FrameManager"
if mgr in self.selenium_extras:
body.append(ast.Assign(
targets=[ast_attr("self.frm_mng")],
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver")])))
self.selenium_extras.add("LocatorsManager")
mgr = "LocatorsManager"
body.append(ast.Assign(
targets=[ast_attr("self.loc_mng")],
value=ast_call(
func=ast.Name(id=mgr),
args=[ast_attr("self.driver"), ast.Str(self._get_scenario_timeout())])))
return body
def _gen_class_teardown(self):
body = [
ast.If(
test=ast_attr("self.driver"),
body=ast.Expr(ast_call(func=ast_attr("self.driver.quit"))), orelse=[])]
return ast.FunctionDef(name="tearDown", args=[ast_attr("self")], body=body, decorator_list=[])