How to use the cliboa.util.helper.Helper.set_property function in cliboa

To help you get started, we’ve selected a few cliboa examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github BrainPad / cliboa / tests / scenario / transform / test_file.py View on Github external
def test_excute_ng_multiple_src(self):
        with pytest.raises(InvalidCount) as execinfo:
            try:
                # create test file
                os.makedirs(self._data_dir)
                excel_file = os.path.join(self._data_dir, "test1.xlxs")
                open(excel_file, "w").close()
                excel_file2 = os.path.join(self._data_dir, "test2.xlxs")
                open(excel_file2, "w").close()

                # set the essential attributes
                instance = ExcelConvert()
                Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
                Helper.set_property(instance, "src_dir", self._data_dir)
                Helper.set_property(instance, "src_pattern", "test(.*)\.xlxs")
                Helper.set_property(instance, "dest_dir", self._data_dir)
                Helper.set_property(instance, "dest_pattern", "test(.*).xlxs")
                instance.execute()
            finally:
                shutil.rmtree(self._data_dir)
        assert "must be only one" in str(execinfo.value)
github BrainPad / cliboa / tests / scenario / transform / test_file.py View on Github external
def test_execute_ng_no_multiple_files(self):
        # create test files
        os.makedirs(self._data_dir)
        test1_csv = os.path.join(self._data_dir, "test1.csv")
        test2_csv = os.path.join(self._data_dir, "test2.csv")
        open(test1_csv, "w").close
        open(test2_csv, "w").close

        with pytest.raises(InvalidCount) as execinfo:
            # set the essential attributes
            instance = CsvHeaderConvert()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            Helper.set_property(instance, "src_dir", self._data_dir)
            Helper.set_property(instance, "src_pattern", "test(.*)\.csv")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test_new.csv")
            Helper.set_property(
                instance, "headers", [{"key": "new_key"}, {"data": "new_data"}]
            )
            instance.execute()

        shutil.rmtree(self._data_dir)
        assert "only one" in str(execinfo.value)
github BrainPad / cliboa / tests / scenario / extract / test_ftp.py View on Github external
def test_execute_ok(self):
        try:
            os.makedirs(self._data_dir)
            instance = FtpDownload()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            # use public ftp
            Helper.set_property(instance, "host", "test.rebex.net")
            Helper.set_property(instance, "user", "demo")
            Helper.set_property(instance, "password", "password")
            Helper.set_property(instance, "src_dir", "/")
            Helper.set_property(instance, "src_pattern", "(.*).txt")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            instance.execute()
            exists_file = os.path.exists(os.path.join(self._data_dir, "readme.txt"))
        finally:
            shutil.rmtree(self._data_dir)
        assert exists_file is True
github BrainPad / cliboa / tests / scenario / extract / test_ftp.py View on Github external
def test_execute_ok(self):
        try:
            os.makedirs(self._data_dir)
            instance = FtpDownload()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            # use public ftp
            Helper.set_property(instance, "host", "test.rebex.net")
            Helper.set_property(instance, "user", "demo")
            Helper.set_property(instance, "password", "password")
            Helper.set_property(instance, "src_dir", "/")
            Helper.set_property(instance, "src_pattern", "(.*).txt")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            instance.execute()
            exists_file = os.path.exists(os.path.join(self._data_dir, "readme.txt"))
        finally:
            shutil.rmtree(self._data_dir)
        assert exists_file is True
github BrainPad / cliboa / tests / scenario / transform / test_file.py View on Github external
def test_execute_ok(self):
        try:
            # create test file
            csv_list = [["key", "data"], ["1", "spam"], ["2", "spam"], ["3", "spam"]]
            os.makedirs(self._data_dir)
            test_csv = os.path.join(self._data_dir, "test.csv")
            with open(test_csv, "w") as t:
                writer = csv.writer(t)
                writer.writerows(csv_list)

            # set the essential attributes
            instance = CsvHeaderConvert()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            Helper.set_property(instance, "src_dir", self._data_dir)
            Helper.set_property(instance, "src_pattern", "test\.csv")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test_new.csv")
            Helper.set_property(
                instance, "headers", [{"key": "new_key"}, {"data": "new_data"}]
            )
            instance.execute()

            test_new_csv = os.path.join(self._data_dir, "test_new.csv")
            with open(test_new_csv, "r") as t:
                reader = csv.reader(t)
                line = next(reader)
        finally:
            shutil.rmtree(self._data_dir)
        assert line == ["new_key", "new_data"]
github BrainPad / cliboa / tests / scenario / extract / test_http.py View on Github external
def test_execute_ok(self):
        try:
            os.makedirs(self._data_dir)
            instance = HttpDownload()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            # use Postman echo
            Helper.set_property(instance, "src_url", "https://postman-echo.com")
            Helper.set_property(instance, "src_pattern", "get?foo1=bar1&foo2=bar2")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test.result")
            instance.execute()
            f = open(os.path.join(self._data_dir, "test.result"), "r")
            result = f.read()
            f.close()
        finally:
            shutil.rmtree(self._data_dir)
        assert "postman-echo.com" in result
github BrainPad / cliboa / cliboa / core / manager.py View on Github external
instances = []
            if "parallel" in s_dict.keys():
                for row in s_dict.get("parallel"):
                    instance = self.__create_instance(row, yaml_scenario_list)
                    Helper.set_property(
                        instance,
                        "logger",
                        LisboaLog.get_logger(instance.__class__.__name__),
                    )

                    instances.append(instance)
                    StepArgument._put(row["step"], instance)
            else:
                instance = self.__create_instance(s_dict, yaml_scenario_list)
                Helper.set_property(
                    instance,
                    "logger",
                    LisboaLog.get_logger(instance.__class__.__name__),
                )
                instances.append(instance)
                StepArgument._put(s_dict["step"], instance)

            # Put instance to queue
            q.push(instances)

        # save queue to static area
        setattr(ScenarioQueue, "step_queue", q)
        self._logger.info("Finish to invoke scenario")
github BrainPad / cliboa / cliboa / core / manager.py View on Github external
di_params = cls_attrs.get(k)
                valid = DIScenarioFormat(k, di_params)
                valid()
                di_cls = di_params["class"]
                di_cls = globals()[di_cls]
                di_instance = di_cls()
                if di_instance:
                    self._logger.debug(
                        "An instance %s to be injected exists." % di_instance
                    )
                    del di_params["class"]

                # set attributes to instance
                if di_params:
                    for k, v in di_params.items():
                        Helper.set_property(di_instance, k, v)
                di_instances.append(di_instance)
        return di_keys, di_instances