Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
any("step" in y for y in yaml_scenario)
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert "invalid" in str(excinfo.value)
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [
{"arguments": {"retry_count": 10}, "class": "", "step": "sftp_download"}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
any("step" in y for y in yaml_scenario)
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert "invalid" in str(excinfo.value)
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
any("step" in y for y in yaml_scenario)
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert "invalid" in str(excinfo.value)
"step": "sftp_download",
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [{"class": "SftpDownload", "step": "sftp_download"}]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
exists_step = True
try:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
exists_step = any("step" in y for y in yaml_scenario)
except Exception:
exists_step = False
else:
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert exists_step is True
cmn_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
exists_step = True
try:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
exists_step = any("step" in y for y in yaml_scenario)
except Exception:
exists_step = False
else:
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert exists_step is True
cmn_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
exists_step = True
try:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
exists_step = any("step" in y for y in yaml_scenario)
except Exception:
exists_step = False
else:
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert exists_step is True
os.makedirs(self._pj_dir)
pj_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
exists_step = True
try:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
exists_step = any("step" in y for y in yaml_scenario)
except Exception:
exists_step = False
else:
shutil.rmtree(self._pj_dir)
assert exists_step is True
"step": "sample step",
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [{"class": "SftpDownload", "step": "sftp_download"}]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
exists_step = True
try:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
exists_step = any("step" in y for y in yaml_scenario)
except Exception:
exists_step = False
else:
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert exists_step is True
def create_scenario_queue(self):
self._logger.info("Start to create scenario queue")
# validation
self.__valid_essential_dir()
self.__valid_essential_files()
# parse scenario.yml
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario_list = parser.parse()
if yaml_scenario_list and isinstance(yaml_scenario_list, list):
self.__invoke_steps(yaml_scenario_list)
else:
raise ScenarioFileInvalid("scenario.yml is invalid.")
self._logger.info("Finish to create scenario queue")