How to use the cliboa.util.helper.Helper function in cliboa

To help you get started, we’ve selected a few cliboa examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github BrainPad / cliboa / tests / scenario / transform / test_file.py View on Github external
def test_execute_ok(self):
        try:
            # create test file
            os.makedirs(self._data_dir)
            excel_file = os.path.join(self._data_dir, "test.xlxs")
            workbook = xlsxwriter.Workbook(excel_file)
            workbook.close()

            # set the essential attributes
            instance = ExcelConvert()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            Helper.set_property(instance, "src_dir", self._data_dir)
            Helper.set_property(instance, "src_pattern", "test\.xlxs")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test.csv")
            instance.execute()

            exists_csv = glob(os.path.join(self._data_dir, "test.csv"))
        finally:
            shutil.rmtree(self._data_dir)
        assert "test.csv" in exists_csv[0]
github BrainPad / cliboa / tests / scenario / extract / test_http.py View on Github external
def test_execute_ok(self):
        try:
            os.makedirs(self._data_dir)
            instance = HttpDownload()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            # use Postman echo
            Helper.set_property(instance, "src_url", "https://postman-echo.com")
            Helper.set_property(instance, "src_pattern", "get?foo1=bar1&foo2=bar2")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test.result")
            instance.execute()
            f = open(os.path.join(self._data_dir, "test.result"), "r")
            result = f.read()
            f.close()
        finally:
            shutil.rmtree(self._data_dir)
        assert "postman-echo.com" in result
github BrainPad / cliboa / tests / scenario / extract / test_sftp.py View on Github external
def test_execute_ok(self):
        os.makedirs(self._data_dir)
        instance = SftpDownload()
        Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
        # use public sftp
        Helper.set_property(instance, "host", "test.rebex.net")
        Helper.set_property(instance, "user", "demo")
        Helper.set_property(instance, "password", "password")
        Helper.set_property(instance, "src_dir", "/")
        Helper.set_property(instance, "src_pattern", "(.*).txt")
        Helper.set_property(instance, "dest_dir", self._data_dir)
        instance.execute()
        exists_file = os.path.exists(os.path.join(self._data_dir, "readme.txt"))
        shutil.rmtree(self._data_dir)
        assert exists_file is True
github BrainPad / cliboa / tests / scenario / extract / test_ftp.py View on Github external
def test_execute_ok(self):
        try:
            os.makedirs(self._data_dir)
            instance = FtpDownload()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            # use public ftp
            Helper.set_property(instance, "host", "test.rebex.net")
            Helper.set_property(instance, "user", "demo")
            Helper.set_property(instance, "password", "password")
            Helper.set_property(instance, "src_dir", "/")
            Helper.set_property(instance, "src_pattern", "(.*).txt")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            instance.execute()
            exists_file = os.path.exists(os.path.join(self._data_dir, "readme.txt"))
        finally:
            shutil.rmtree(self._data_dir)
        assert exists_file is True
github BrainPad / cliboa / tests / scenario / transform / test_file.py View on Github external
["2", "spam"],
                ["3", "spam"],
            ]
            test_csv2 = os.path.join(self._data_dir, "test2.csv")
            with open(test_csv2, "w") as t2:
                writer = csv.writer(t2)
                writer.writerows(csv_list2)

            # set the essential attributes
            instance = CsvMerge()
            Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
            Helper.set_property(instance, "src_dir", self._data_dir)
            Helper.set_property(instance, "src1_pattern", "test1\.csv")
            Helper.set_property(instance, "src2_pattern", "test2\.csv")
            Helper.set_property(instance, "dest_dir", self._data_dir)
            Helper.set_property(instance, "dest_pattern", "test.csv")
            instance.execute()

            exists_csv = glob(os.path.join(self._data_dir, "test.csv"))
        finally:
            shutil.rmtree(self._data_dir)
        assert "test.csv" in exists_csv[0]
github BrainPad / cliboa / tests / scenario / test_base.py View on Github external
def test_execute_ng_invalid_dbname(self):
        """
        sqlite db does not exist.
        """
        try:
            instance = BaseSqlite()
            db_file = os.path.join(self._db_dir, "spam.db")
            Helper.set_property(instance, "dbname", db_file)
            Helper.set_property(instance, "tblname", "spam_table")
            instance.execute()
        except Exception as e:
            tb = sys.exc_info()[2]
            assert "not found" in "{0}".format(e.with_traceback(tb))
github BrainPad / cliboa / tests / scenario / test_base.py View on Github external
def test_execute_ng_invalid_dbname(self):
        """
        sqlite db does not exist.
        """
        try:
            instance = BaseSqlite()
            db_file = os.path.join(self._db_dir, "spam.db")
            Helper.set_property(instance, "dbname", db_file)
            Helper.set_property(instance, "tblname", "spam_table")
            instance.execute()
        except Exception as e:
            tb = sys.exc_info()[2]
            assert "not found" in "{0}".format(e.with_traceback(tb))
github BrainPad / cliboa / tests / scenario / extract / test_sftp.py View on Github external
def test_execute_ok(self):
        os.makedirs(self._data_dir)
        instance = SftpDownload()
        Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
        # use public sftp
        Helper.set_property(instance, "host", "test.rebex.net")
        Helper.set_property(instance, "user", "demo")
        Helper.set_property(instance, "password", "password")
        Helper.set_property(instance, "src_dir", "/")
        Helper.set_property(instance, "src_pattern", "(.*).txt")
        Helper.set_property(instance, "dest_dir", self._data_dir)
        instance.execute()
        exists_file = os.path.exists(os.path.join(self._data_dir, "readme.txt"))
        shutil.rmtree(self._data_dir)
        assert exists_file is True
github BrainPad / cliboa / cliboa / core / manager.py View on Github external
def __create_instance(self, s_dict, yaml_scenario_list):
        cls_name = s_dict["class"]
        self._logger.debug("Create %s instance" % cls_name)

        if self.__is_custom_cls(cls_name) is True:
            from cliboa.core.factory import CustomInstanceFactory

            instance = CustomInstanceFactory.create(cls_name)
        else:
            cls = globals()[cls_name]
            instance = cls()

        base_args = ["step", "symbol", "parallel", "io"]
        for arg in base_args:
            Helper.set_property(instance, arg, s_dict.get(arg))

        cls_attrs_dict = {}
        if isinstance(yaml_scenario_list, list) and "arguments" in s_dict.keys():
            cls_attrs_dict = s_dict["arguments"]

        # loop and set class attribute
        di_key = None
        di_instance = None
        if cls_attrs_dict:
            cls_attrs_dict = self.__extract_with_vars(cls_attrs_dict)

            # check if the keys of dependency injection
            di_keys, di_instances = self.__create_di_instance(cls_attrs_dict)
            if di_keys and di_instances:
                di_keys_and_instances = zip(di_keys, di_instances)
                for di_key, di_instance in di_keys_and_instances: