Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
format = 'multipart_s3'
elif path[-1] == '/' and not s3_utils.is_s3_url(path): # and path[:2] != 's3'
format = 'multipart_local'
elif '.parquet' in path or path[-1] == '/':
format = 'parquet'
else:
format = 'csv'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
return load(path=content_path, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep, dtype=dtype,
error_bad_lines=error_bad_lines, header=header, names=names, format=None, nrows=nrows, skiprows=skiprows,
usecols=usecols, low_memory=low_memory, converters=converters, filters=filters, sample_count=sample_count,
worker_count=worker_count, multiprocessing_method=multiprocessing_method)
elif format == 'multipart_s3':
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(path)
return load_multipart_s3(bucket=bucket, prefix=prefix, columns_to_keep=columns_to_keep, dtype=dtype, filters=filters,
sample_count=sample_count, worker_count=worker_count, multiprocessing_method=multiprocessing_method) # TODO: Add arguments!
elif format == 'multipart_local':
paths = [join(path, f) for f in listdir(path) if (isfile(join(path, f))) & (f.startswith('part-'))]
return load_multipart(
paths=paths, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep,
dtype=dtype, error_bad_lines=error_bad_lines, header=header, names=names, format=None,
nrows=nrows, skiprows=skiprows, usecols=usecols, low_memory=low_memory, converters=converters,
filters=filters,
worker_count=worker_count,
multiprocessing_method=multiprocessing_method,
)
elif format == 'parquet':
try:
df = pd.read_parquet(path, columns=columns_to_keep, engine='fastparquet') # TODO: Deal with extremely strange issue resulting from torch being present in package, will cause read_parquet to either freeze or Segmentation Fault when performing multiprocessing
column_count_full = len(df.columns)
type = 'csv'
if 's3' not in path[:2]:
is_local = True
else:
is_local = False
column_count = len(list(df.columns.values))
row_count = df.shape[0]
if is_local:
os.makedirs(os.path.dirname(path), exist_ok=True)
if type == 'csv':
if is_local:
df.to_csv(path, index=index, sep=sep, header=header)
else:
buffer = StringIO()
df.to_csv(buffer, index=index, sep=sep, header=header)
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, prefix).put(Body=buffer.getvalue(), ACL='bucket-owner-full-control')
if verbose:
logger.log(15, "Saved " +str(path)+" | Columns = "+str(column_count)+" | Rows = "+str(row_count))
elif type == 'parquet':
try:
df.to_parquet(path, compression=compression, engine='fastparquet') # TODO: Might be slower than pyarrow in multiprocessing
except:
df.to_parquet(path, compression=compression, engine='pyarrow')
if verbose:
logger.log(15, "Saved "+str(path)+" | Columns = "+str(column_count)+" | Rows = "+str(row_count))
elif type == 'multipart_s3':
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3_utils.delete_s3_prefix(bucket=bucket, prefix=prefix) # TODO: Might only delete the first 1000!
save_multipart(path=path, df=df, index=index, verbose=verbose, type='parquet', sep=sep, compression=compression, header=header, json_dump_columns=None)
elif type == 'multipart_local':
def get_pointer_content(path, verbose=True):
if s3_utils.is_s3_url(path):
bucket, key = s3_utils.s3_path_to_bucket_prefix(path)
s3 = boto3.resource('s3')
obj = s3.Object(bucket, key)
content_path = obj.get()['Body'].read().decode('utf-8')
else:
os.makedirs(os.path.dirname(path), exist_ok=True)
f = open(path, "r")
content_path = f.read()
f.close()
if verbose:
logger.log(15, 'Loaded pointer file '+str(path)+' pointing to '+str(content_path))
return content_path