Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _run_test(self, file, variables):
test = None
try:
test = self.prepare_test(file, variables)
logger.log_storage.test_start(file)
test.run()
info('Test ' + file + logger.green(' passed.'))
logger.log_storage.test_end(file, True)
return test, True
except SkipException:
info('Test ' + file + logger.yellow(' skipped.'))
logger.log_storage.test_end(file, True, end_comment='Skipped')
return test, True
except Exception as e:
warning('Test ' + file + logger.red(' failed: ') + str(e))
debug(traceback.format_exc())
logger.log_storage.test_end(file, False, str(e))
return test, False
def render(source: str, variables: dict) -> str:
template = Template(source)
holder = FiltersHolder()
for filter_mod, value in holder.filters.items():
template.environment.filters[filter_mod] = value
for fun_mod, value in holder.functions.items():
template.globals[fun_mod] = value
try:
return template.render(variables)
except UndefinedError as e:
debug(e.message)
return source
def _form_request(self, url, variables: dict) -> dict:
headers = dict([(fill_template_str(k, variables), fill_template_str(v, variables))
for k, v in self.headers.items()])
rq = dict(verify=self.verify, headers=headers, files=self.__form_files(variables))
isjson, body = self.__form_body(variables)
debug('http ' + str(self.method) + ' ' + str(url) + ', ' + str(headers) + ', ' + str(body))
content_type = self.__get_content_type(headers)
if isjson or isinstance(body, dict): # contains tojson or dict supplied
if isinstance(body, dict) and content_type == 'application/json':
# json body formed manually via python dict
rq['json'] = body
else: # json string or form-data dict
rq['data'] = body
else: # raw body (or body is None)
rq['data'] = body
rq['timeout'] = self.timeout
return rq
def _run_finally(cls, test, file, result: bool):
if test and test.final:
logger.log_storage.test_start(file, test_type='{}_cleanup'.format(file))
try:
test.run_finally(result)
info('Test ' + file + ' [cleanup] ' + logger.green(' passed.'))
logger.log_storage.test_end(file, True, test_type='{} [cleanup]'.format(file))
except Exception as e:
warning('Test ' + file + ' [cleanup] ' + logger.red(' failed: ') + str(e))
debug(traceback.format_exc())
logger.log_storage.test_end(file, False, test_type='{} [cleanup]'.format(file))
def _run_include(test, file, include_file, ignore_errors):
"""
Run include as soon as it was included (run on include)
"""
try:
logger.log_storage.test_start(include_file['file'], test_type='include')
debug('Run include: ' + str(test))
res = test.run()
logger.log_storage.test_end(test.path, True, res, test_type='include')
return True
except SkipException:
info('Include ' + file + logger.yellow(' skipped.'))
logger.log_storage.test_end(file, True, end_comment='Skipped')
return True
except Exception as e:
logger.log_storage.test_end(test.path, False, str(e), test_type='include')
if not ignore_errors:
raise Exception('Include ' + file + ' failed: ' + str(e))
return False
def get_messages(consumer: SimpleConsumer, where: Operator or None, variables, timeout) -> dict or None:
try:
while True:
consumer.fetch()
for message in consumer:
value = try_get_object(message.value.decode('utf-8'))
debug(value)
if Kafka.check_message(where, value, variables):
return value
if timeout > 0:
sleep(1)
timeout -= 1
else:
return None
finally:
consumer.commit_offsets()
def run_loop(self, includes, variables):
output = variables
repeat = True
start = time.time()
while repeat:
if time.time() > start + self.delay: # time limit reached
raise TimeoutError
loop_vars = deepcopy(output) # start every loop from the same variables
try:
for action in self._actions:
loop_vars = action.action(includes, loop_vars)
repeat = False
output = loop_vars # if loop was successful - return modified variables
except Exception as e:
debug('wait step failure {}'.format(e))
return output
def _prepare_include_vars(test, global_variables):
if test.include:
include_vars = try_get_object(fill_template_str(test.include.variables, global_variables))
else:
include_vars = {}
override_keys = report_override(test.variables, include_vars)
if override_keys:
debug('Include variables override these variables: ' + str(override_keys))
return include_vars