Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
{
"arguments": {
"src_pattern": "foo_{{ today }}.csv",
"with_vars": {"today": "date '+%Y%m%d'"},
},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
is_completed_queue_creation = True
try:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
ScenarioQueue.step_queue.pop()
except Exception:
is_completed_queue_creation = False
else:
shutil.rmtree(self._pj_dir)
assert is_completed_queue_creation is True
def test_create_scenario_queue_ok_with_no_args(self):
"""
Valid scenario.yml with no arguments
"""
os.makedirs(self._pj_dir)
pj_yaml_dict = {
"scenario": [{"class": "SftpDownload", "step": "sftp_download"}]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
is_completed_queue_creation = True
try:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
ScenarioQueue.step_queue.pop()
except Exception:
is_completed_queue_creation = False
else:
shutil.rmtree(self._pj_dir)
assert is_completed_queue_creation is True
def test_scenario_manager_factory_ok_yml(self):
"""
Succeeded to create instance with yml
"""
manager = ScenarioManagerFactory.create(self._cmd_args)
assert isinstance(manager, type(YamlScenarioManager(self._cmd_args)))
"class": "FormAuth",
"form_id": "spam",
"form_password": "spam",
"form_url": "http://spam/",
},
},
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml, default_flow_style=False))
is_completed_queue_creation = True
try:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
ScenarioQueue.step_queue.pop()
except Exception:
is_completed_queue_creation = False
else:
shutil.rmtree(self._pj_dir)
assert is_completed_queue_creation is True
def test_create_scenario_queue_with_no_list_ng(self):
"""
Invalid scenario.yml
"""
os.makedirs(self._pj_dir)
pj_yaml_dict = {"scenario": {"arguments", "spam"}}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
shutil.rmtree(self._pj_dir)
assert "invalid" in str(excinfo.value)
os.makedirs(self._pj_dir)
pj_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
is_completed_queue_creation = True
try:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
ScenarioQueue.step_queue.pop()
except Exception:
is_completed_queue_creation = False
else:
shutil.rmtree(self._pj_dir)
assert is_completed_queue_creation is True
def test_create_scenario_queue_ng(self):
"""
Invalid scenario.yml
"""
os.makedirs(self._pj_dir)
pj_yaml_dict = {"scenario": ["arguments", "spam"]}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
with pytest.raises(AttributeError) as excinfo:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
shutil.rmtree(self._pj_dir)
assert "object has no attribute" in str(excinfo.value)
"scenario": [
{
"step": "spam",
"class": "HttpDownload",
"arguments": {
"auth": {"form_id": "spam"},
"src_url": "https://spam/",
},
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
shutil.rmtree(self._pj_dir)
assert "class: is not specified" in str(excinfo.value)