Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._raw_query = None
def tblname(self, tblname):
self._tblname = tblname
def raw_query(self, raw_query):
self._raw_query = raw_query
def execute(self, *args):
super().execute()
self._sqlite_adptr.connect(self._dbname)
self._sqlite_adptr.execute(self._raw_query)
self._sqlite_adptr.commit()
class Stdout(BaseStep):
"""
Standard output for io: input
"""
def __init__(self):
super().__init__()
def execute(self, *args):
output_valid = IOOutput(self._io)
output_valid()
with open(self._s.cache_file, "r", encoding="utf-8") as f:
for l in f:
print(l)
import os
import shutil
import tarfile
import zipfile
import pandas
from cliboa.core.validator import EssentialParameters
from cliboa.scenario.base import BaseStep
from cliboa.util.date import DateUtil
from cliboa.util.exception import CliboaException, InvalidCount, InvalidFormat
from cliboa.util.file import File
from cliboa.util.string import StringUtil
class FileBaseTransform(BaseStep):
"""
Base class of file extract classes
"""
def __init__(self):
super().__init__()
self._src_dir = ""
self._src_pattern = ""
self._dest_path = ""
self._dest_dir = None
self._dest_pattern = None
self._encoding = "utf-8"
def src_dir(self, src_dir):
self._src_dir = src_dir
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import os
from cliboa.scenario.base import BaseStep
from cliboa.scenario.validator import EssentialParameters
from cliboa.util.http import Download
class HttpExtract(BaseStep):
def __init__(self):
super().__init__()
self._src_url = None
self._src_pattern = None
self._dest_dir = None
self._dest_pattern = None
self._timeout = 30
self._retry_count = 3
def src_url(self, src_url):
self._src_url = src_url
def src_pattern(self, src_pattern):
self._src_pattern = src_pattern
def dest_dir(self, dest_dir):
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import os
import re
from cliboa.scenario.base import BaseStep
from cliboa.scenario.validator import EssentialParameters
from cliboa.util.cache import ObjectStore
from cliboa.util.constant import StepStatus
from cliboa.util.sftp import Sftp
class SftpExtract(BaseStep):
def __init__(self):
super().__init__()
self._src_dir = None
self._src_pattern = None
self._dest_dir = ""
self._host = None
self._port = 22
self._user = None
self._password = None
self._key = None
self._timeout = 30
self._retry_count = 3
def src_dir(self, src_dir):
self._src_dir = src_dir
"""
def __init__(self):
super().__init__()
self._retry_count = 3
self._logger = LisboaLog.get_logger(__name__)
def retry_count(self, retry_count):
self._retry_count = retry_count
def execute(self, *args):
self._logger.info("Start %s" % self.__class__.__name__)
self._logger.info("Finish %s" % self.__class__.__name__)
class SampleCustomStep(BaseStep):
"""
For unit test
"""
def __init__(self):
super().__init__()
self._password = None
self._access_key = None
self._secret_key = None
self._retry_count = 3
def password(self, password):
self._password = password
def access_key(self, access_key):
self._access_key = access_key
fpath = src[5:]
if os.path.exists(fpath) is False:
raise FileNotFound(src)
with open(fpath, mode="r", encoding=encoding) as f:
return f.read()
return src
def _exception_dispatcher(self, e):
"""
Handle and dispath CliboaExceptions
"""
# TODO Currently not doing anything
raise e
class BaseSqlite(BaseStep):
"""
Base class of all the sqlite classes
"""
def __init__(self):
super().__init__()
self._sqlite_adptr = SqliteAdapter()
self._dbname = None
self._columns = []
self._vacuum = False
def dbname(self, dbname):
self._dbname = dbname
def columns(self, columns):
self._columns = columns
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
from google.cloud import bigquery, firestore, storage
from google.oauth2 import service_account
from cliboa.scenario.base import BaseStep
from cliboa.scenario.validator import EssentialParameters
class BaseGcp(BaseStep):
"""
Base class of Gcp usage.
"""
def __init__(self):
super().__init__()
self._project_id = None
self._credentials = None
def project_id(self, project_id):
self._project_id = project_id
def credentials(self, credentials):
self._credentials = credentials
def execute(self, *args):
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import csv
from glob import glob
from cliboa.scenario.base import BaseStep
from cliboa.scenario.validator import EssentialParameters, IOInput
from cliboa.util.cache import ObjectStore
from cliboa.util.exception import CliboaException, FileNotFound
class FileRead(BaseStep):
"""
The parent class to read the specified file
"""
def __init__(self):
super().__init__()
self._src_path = None
self._src_dir = None
self._src_pattern = None
self._encoding = "utf-8"
def src_path(self, src_path):
self._src_path = src_path
def src_dir(self, src_dir):
self._src_dir = src_dir
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
from cliboa.scenario.base import BaseStep
from cliboa.util.lisboa_log import LisboaLog
class SampleStep(BaseStep):
"""
For unit test
"""
def __init__(self):
super().__init__()
self._retry_count = 3
self._logger = LisboaLog.get_logger(__name__)
def retry_count(self, retry_count):
self._retry_count = retry_count
def execute(self, *args):
self._logger.info("Start %s" % self.__class__.__name__)
self._logger.info("Finish %s" % self.__class__.__name__)
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import os
from cliboa.core.validator import EssentialParameters
from cliboa.scenario.base import BaseStep
from cliboa.util.constant import StepStatus
from cliboa.util.sftp import Sftp
class SftpBaseLoad(BaseStep):
def __init__(self):
super().__init__()
self._src_dir = ""
self._src_pattern = ""
self._dest_dir = ""
self._host = None
self._port = 22
self._user = None
self._password = None
self._key = None
self._timeout = 30
self._retry_count = 3
def src_dir(self, src_dir):
self._src_dir = src_dir