Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def format(self):
return None
def load(self, file):
try:
return pd.read_csv(file, sep=self._sep)
except pd.errors.EmptyDataError:
return pd.DataFrame()
def dump(self, obj, file):
assert isinstance(obj, (pd.DataFrame, pd.Series)), \
f'requires pd.DataFrame or pd.Series, but {type(obj)} is passed.'
obj.to_csv(file, index=False, sep=self._sep, header=True)
class GzipFileProcessor(FileProcessor):
def format(self):
return luigi.format.Gzip
def load(self, file):
return [s.rstrip().decode() for s in file.readlines()]
def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write((str(x) + '\n').encode())
else:
file.write(str(obj).encode())
class JsonFileProcessor(FileProcessor):
def format(self):
def read(self, n):
if n >= (1 << 31):
logger.info(f'reading a large file with total_bytes={n}.')
buffer = bytearray(n)
idx = 0
while idx < n:
batch_size = min(n - idx, 1 << 31 - 1)
logger.info(f'reading bytes [{idx}, {idx + batch_size})...')
buffer[idx:idx + batch_size] = self._file.read(batch_size)
idx += batch_size
logger.info('done.')
return buffer
return self._file.read(n)
class PickleFileProcessor(FileProcessor):
def format(self):
return luigi.format.Nop
def load(self, file):
if ObjectStorage.is_readable_objectstorage_instance(file):
return pickle.loads(file.read())
return pickle.load(_LargeLocalFileReader(file))
def dump(self, obj, file):
self._write(pickle.dumps(obj, protocol=4), file)
@staticmethod
def _write(buffer, file):
n = len(buffer)
idx = 0
while idx < n:
self._write(pickle.dumps(obj, protocol=4), file)
@staticmethod
def _write(buffer, file):
n = len(buffer)
idx = 0
while idx < n:
logger.info(f'writing a file with total_bytes={n}...')
batch_size = min(n - idx, 1 << 31 - 1)
logger.info(f'writing bytes [{idx}, {idx + batch_size})')
file.write(buffer[idx:idx + batch_size])
idx += batch_size
logger.info('done')
class TextFileProcessor(FileProcessor):
def format(self):
return None
def load(self, file):
return [s.rstrip() for s in file.readlines()]
def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write(str(x) + '\n')
else:
file.write(str(obj))
class CsvFileProcessor(FileProcessor):
def __init__(self, sep=','):
class GzipFileProcessor(FileProcessor):
def format(self):
return luigi.format.Gzip
def load(self, file):
return [s.rstrip().decode() for s in file.readlines()]
def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write((str(x) + '\n').encode())
else:
file.write(str(obj).encode())
class JsonFileProcessor(FileProcessor):
def format(self):
return None
def load(self, file):
try:
return pd.read_json(file)
except pd.errors.EmptyDataError:
return pd.DataFrame()
def dump(self, obj, file):
assert isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict), \
f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.'
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
obj.to_json(file)
def load(self, file):
try:
return pd.read_json(file)
except pd.errors.EmptyDataError:
return pd.DataFrame()
def dump(self, obj, file):
assert isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict), \
f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.'
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
obj.to_json(file)
class XmlFileProcessor(FileProcessor):
def format(self):
return None
def load(self, file):
try:
return ET.parse(file)
except ET.ParseError:
return ET.ElementTree()
def dump(self, obj, file):
assert isinstance(obj, ET.ElementTree), f'requires ET.ElementTree, but {type(obj)} is passed.'
obj.write(file)
class NpzFileProcessor(FileProcessor):
def format(self):
class TextFileProcessor(FileProcessor):
def format(self):
return None
def load(self, file):
return [s.rstrip() for s in file.readlines()]
def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write(str(x) + '\n')
else:
file.write(str(obj))
class CsvFileProcessor(FileProcessor):
def __init__(self, sep=','):
self._sep = sep
super(CsvFileProcessor, self).__init__()
def format(self):
return None
def load(self, file):
try:
return pd.read_csv(file, sep=self._sep)
except pd.errors.EmptyDataError:
return pd.DataFrame()
def dump(self, obj, file):
assert isinstance(obj, (pd.DataFrame, pd.Series)), \
f'requires pd.DataFrame or pd.Series, but {type(obj)} is passed.'
class XmlFileProcessor(FileProcessor):
def format(self):
return None
def load(self, file):
try:
return ET.parse(file)
except ET.ParseError:
return ET.ElementTree()
def dump(self, obj, file):
assert isinstance(obj, ET.ElementTree), f'requires ET.ElementTree, but {type(obj)} is passed.'
obj.write(file)
class NpzFileProcessor(FileProcessor):
def format(self):
return luigi.format.Nop
def load(self, file):
return np.load(file)['data']
def dump(self, obj, file):
assert isinstance(obj, np.ndarray), f'requires np.ndarray, but {type(obj)} is passed.'
np.savez_compressed(file, data=obj)
def make_file_processor(file_path: str) -> FileProcessor:
extension2processor = {
'.txt': TextFileProcessor(),
'.csv': CsvFileProcessor(sep=','),
'.tsv': CsvFileProcessor(sep='\t'),