Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
reader = csv.DictReader(fi, delimiter=delimiter)
writer = csv.DictWriter(fo, reader.fieldnames)
writer.writeheader()
date_util = DateUtil()
for row in reader:
for column in self._columns:
r = row.get(column)
if not r:
continue
row[column] = date_util.convert_date_format(r, self._formatter)
writer.writerow(row)
fo.flush()
self._logger.info("Finish %s" % self.__class__.__name__)
class ExcelConvert(FileBaseTransform):
"""
Convert excel to other format
"""
def __init__(self):
super().__init__()
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__,
[self._src_dir, self._src_pattern, self._dest_dir, self._dest_pattern],
)
valid()
# get a target file
df2 = pandas.read_csv(
os.path.join(self._src_dir, target2_files[0]),
dtype=str,
encoding=self._encoding,
)
df = pandas.merge(df1, df2)
if "Unnamed: 0" in df.index:
del df["Unnamed: 0"]
df.to_csv(
os.path.join(self._dest_dir, self._dest_pattern),
encoding=self._encoding,
index=False,
)
class CsvHeaderConvert(FileBaseTransform):
"""
Conver csv headers
"""
def __init__(self):
super().__init__()
self._headers = []
def headers(self, headers):
self._headers = headers
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__,
[
fb.write(bz2.decompress(rb))
elif ext == ".gz":
self._logger.info("Decompress gz file %s" % f)
dcom_name = os.path.splitext(os.path.basename(f))[0]
decom_path = (
os.path.join(self._dest_dir, dcom_name)
if self._dest_dir is not None
else os.path.join(self._src_dir, dcom_name)
)
with gzip.open(f, "rb") as i, open(decom_path, "wb") as o:
o.write(i.read())
else:
raise CliboaException("Unmatched any available decompress type %s" % f)
class FileCompress(FileBaseTransform):
"""
Compress files
"""
def __init__(self):
super().__init__()
self._format = None
def format(self, format):
self._format = format.lower()
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__, [self._src_dir, self._src_pattern, self._format]
)
def _csv_quote(self):
if "QUOTE_ALL" == self._quote:
return csv.QUOTE_ALL
elif "QUOTE_MINIMAL" == self._quote:
return csv.QUOTE_MINIMAL
elif "QUOTE_NONNUMERIC" == self._quote:
return csv.QUOTE_NONNUMERIC
elif "QUOTE_NONE" == self._quote:
return csv.QUOTE_NONE
else:
raise CliboaException(
"Unknown quote. One of the followings are allowd [QUOTE_ALL, QUOTE_MINIMAL, QUOTE_NONNUMERIC, QUOTE_NONE]"
)
class FileConvert(FileBaseTransform):
"""
Convert file encoding
"""
def __init__(self):
super().__init__()
self._encoding_from = None
self._encoding_to = None
def encoding_from(self, encoding_from):
self._encoding_from = encoding_from
def encoding_to(self, encoding_to):
self._encoding_to = encoding_to
def execute(self, *args):
Replace old headers to new headers
"""
converter = {}
for headers in self._headers:
for k, v in headers.items():
converter[k] = v
new_headers = []
for oh in old_headers:
r = converter.get(oh)
new_headers.append(r if r is not None else oh)
return new_headers
class FileDivide(FileBaseTransform):
"""
Divide a file to plural files
"""
def __init__(self):
super().__init__()
self._divide_rows = None
self._header = False
def divide_rows(self, divide_rows):
self._divide_rows = divide_rows
def header(self, header):
self._header = header
def execute(self, *args):
self._encoding = encoding
def execute(self, *args):
# essential parameters check
valid = EssentialParameters(
self.__class__.__name__, [self._src_dir, self._src_pattern]
)
valid()
files = super().get_target_files(self._src_dir, self._src_pattern)
if len(files) != 1:
raise Exception("Input file must be only one.")
return files[0]
class FileDecompress(FileBaseTransform):
"""
Decompress the specified file
"""
def __init__(self):
super().__init__()
def execute(self, *args):
files = super().get_target_files(self._src_dir, self._src_pattern)
self._logger.info("Files found %s" % files)
for f in files:
_, ext = os.path.splitext(f)
if ext == ".zip":
self._logger.info("Decompress zip file %s" % f)
with zipfile.ZipFile(f) as zp:
zp.extractall(
# convert
_, dest_ext = os.path.splitext(self._dest_pattern)
if dest_ext != ".csv":
raise InvalidFormat(
"%s is not supported format in %s. The supported format is .csv"
% (dest_ext, self._dest_pattern)
)
df = pandas.read_excel(target_files[0], encoding=self._encoding)
dest_path = os.path.join(self._dest_dir, self._dest_pattern)
self._logger.info("Convert %s to %s" % (target_files[0], dest_path))
df.to_csv(dest_path, encoding=self._encoding)
class CsvMerge(FileBaseTransform):
"""
Merge two csv files
"""
def __init__(self):
super().__init__()
self._src1_pattern = None
self._src2_pattern = None
def src1_pattern(self, src1_pattern):
self._src1_pattern = src1_pattern
def src2_pattern(self, src2_pattern):
self._src2_pattern = src2_pattern
def execute(self, *args):
written = False
with open(filepath, mode="w", encoding=self._encoding) as o:
if self._header is True:
o.write(self._header_row)
for i, line in enumerate(row):
written = True
o.write(line)
if i + 1 >= self._divide_rows:
left = True
break
if written is False:
os.remove(filepath)
return left
class FileRename(FileBaseTransform):
"""
Change file names with adding either prefix or suffix.
"""
def __init__(self):
super().__init__()
self._prefix = ""
self._suffix = ""
def prefix(self, prefix):
self._prefix = prefix
def suffix(self, suffix):
self._suffix = suffix
def execute(self, *args):
with open(f, "rb") as i:
self._logger.info("Compress file %s to gzip." % f)
with gzip.open(
os.path.join(dir, (os.path.basename(f) + ".gz")), "wb"
) as o:
shutil.copyfileobj(i, o)
elif self._format in ("bz2", "bzip2"):
with open(f, "rb") as i:
self._logger.info("Compress file %s to bzip2." % f)
with open(
os.path.join(dir, (os.path.basename(f) + ".bz2")), "wb"
) as o:
o.write(bz2.compress(i.read()))
class CsvColsExtract(FileBaseTransform):
"""
Remove columns from csv file.
"""
def __init__(self):
super().__init__()
self._columns = None
def columns(self, columns):
self._columns = columns
def execute(self, *args):
file = super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._columns])
valid()
self._dest_path, mode="w", encoding=self._encoding
) as fo:
reader = csv.DictReader(fi)
writer = csv.DictWriter(fo, reader.fieldnames)
writer.writeheader()
for row in reader:
for k, v in self._adjust.items():
f1 = row.get(k)
if len(f1) > v:
row[k] = f1[:v]
writer.writerow(row)
fo.flush()
class DateFormatConvert(FileBaseTransform):
"""
Convert csv (tsv) date field columns to another date field format columns
"""
def __init__(self):
super().__init__()
self._columns = []
self._formatter = None
def columns(self, columns):
self._columns = columns
def formatter(self, formatter):
self._formatter = formatter
def execute(self, *args):