How to use the cliboa.core.manager.YamlScenarioManager function in cliboa

To help you get started, we’ve selected a few cliboa examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github BrainPad / cliboa / tests / core / test_manager.py View on Github external
{
                    "arguments": {
                        "src_pattern": "foo_{{ today }}.csv",
                        "with_vars": {"today": "date '+%Y%m%d'"},
                    },
                    "class": "SftpDownload",
                    "step": "sftp_download",
                }
            ]
        }
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))

        is_completed_queue_creation = True
        try:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
            ScenarioQueue.step_queue.pop()
        except Exception:
            is_completed_queue_creation = False
        else:
            shutil.rmtree(self._pj_dir)
        assert is_completed_queue_creation is True
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
def test_create_scenario_queue_ok_with_no_args(self):
        """
        Valid scenario.yml with no arguments
        """
        os.makedirs(self._pj_dir)
        pj_yaml_dict = {
            "scenario": [{"class": "SftpDownload", "step": "sftp_download"}]
        }
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))

        is_completed_queue_creation = True
        try:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
            ScenarioQueue.step_queue.pop()
        except Exception:
            is_completed_queue_creation = False
        else:
            shutil.rmtree(self._pj_dir)
        assert is_completed_queue_creation is True
github BrainPad / cliboa / tests / core / test_factory.py View on Github external
def test_scenario_manager_factory_ok_yml(self):
        """
        Succeeded to create instance with yml
        """
        manager = ScenarioManagerFactory.create(self._cmd_args)
        assert isinstance(manager, type(YamlScenarioManager(self._cmd_args)))
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
"class": "FormAuth",
                            "form_id": "spam",
                            "form_password": "spam",
                            "form_url": "http://spam/",
                        },
                    },
                }
            ]
        }

        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml, default_flow_style=False))

        is_completed_queue_creation = True
        try:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
            ScenarioQueue.step_queue.pop()
        except Exception:
            is_completed_queue_creation = False
        else:
            shutil.rmtree(self._pj_dir)
        assert is_completed_queue_creation is True
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
def test_create_scenario_queue_with_no_list_ng(self):
        """
        Invalid scenario.yml
        """
        os.makedirs(self._pj_dir)
        pj_yaml_dict = {"scenario": {"arguments", "spam"}}
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))

        with pytest.raises(ScenarioFileInvalid) as excinfo:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
        shutil.rmtree(self._pj_dir)
        assert "invalid" in str(excinfo.value)
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
os.makedirs(self._pj_dir)
        pj_yaml_dict = {
            "scenario": [
                {
                    "arguments": {"retry_count": 10},
                    "class": "SftpDownload",
                    "step": "sftp_download",
                }
            ]
        }
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))

        is_completed_queue_creation = True
        try:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
            ScenarioQueue.step_queue.pop()
        except Exception:
            is_completed_queue_creation = False
        else:
            shutil.rmtree(self._pj_dir)
        assert is_completed_queue_creation is True
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
def test_create_scenario_queue_ng(self):
        """
        Invalid scenario.yml
        """
        os.makedirs(self._pj_dir)
        pj_yaml_dict = {"scenario": ["arguments", "spam"]}
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))

        with pytest.raises(AttributeError) as excinfo:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
        shutil.rmtree(self._pj_dir)
        assert "object has no attribute" in str(excinfo.value)
github BrainPad / cliboa / tests / core / test_manager.py View on Github external
"scenario": [
                {
                    "step": "spam",
                    "class": "HttpDownload",
                    "arguments": {
                        "auth": {"form_id": "spam"},
                        "src_url": "https://spam/",
                    },
                }
            ]
        }
        with open(self._pj_scenario_file, "w") as f:
            f.write(yaml.dump(pj_yaml, default_flow_style=False))

        with pytest.raises(ScenarioFileInvalid) as excinfo:
            manager = YamlScenarioManager(self._cmd_args)
            manager.create_scenario_queue()
        shutil.rmtree(self._pj_dir)
        assert "class: is not specified" in str(excinfo.value)