Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(path, list):
return load_multipart(
paths=path, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep,
dtype=dtype, error_bad_lines=error_bad_lines, header=header, names=names, format=format,
nrows=nrows, skiprows=skiprows, usecols=usecols, low_memory=low_memory, converters=converters,
filters=filters,
worker_count=worker_count,
multiprocessing_method=multiprocessing_method
)
if format is not None:
pass
elif path.endswith(save_pointer.POINTER_SUFFIX):
format = 'pointer'
elif path[-1] == '/' and s3_utils.is_s3_url(path): # and path[:2] == 's3'
format = 'multipart_s3'
elif path[-1] == '/' and not s3_utils.is_s3_url(path): # and path[:2] != 's3'
format = 'multipart_local'
elif '.parquet' in path or path[-1] == '/':
format = 'parquet'
else:
format = 'csv'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
return load(path=content_path, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep, dtype=dtype,
error_bad_lines=error_bad_lines, header=header, names=names, format=None, nrows=nrows, skiprows=skiprows,
usecols=usecols, low_memory=low_memory, converters=converters, filters=filters, sample_count=sample_count,
worker_count=worker_count, multiprocessing_method=multiprocessing_method)
elif format == 'multipart_s3':
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(path)
return load_multipart_s3(bucket=bucket, prefix=prefix, columns_to_keep=columns_to_keep, dtype=dtype, filters=filters,
sample_count=sample_count, worker_count=worker_count, multiprocessing_method=multiprocessing_method) # TODO: Add arguments!
def load_with_fn(path, pickle_fn, format=None, verbose=True):
if path.endswith('.pointer'):
format = 'pointer'
elif s3_utils.is_s3_url(path):
format = 's3'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
if content_path == path:
raise RecursionError('content_path == path! : ' + str(path))
return load_with_fn(content_path, pickle_fn)
elif format == 's3':
if verbose: logger.log(15, 'Loading: %s' % path)
s3_bucket, s3_prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3 = boto3.resource('s3')
# Has to be wrapped in IO buffer since s3 stream does not implement seek()
buff = io.BytesIO(s3.Bucket(s3_bucket).Object(s3_prefix).get()['Body'].read())
return pickle_fn(buff)
if verbose: logger.log(15, 'Loading: %s' % path)
with open(path, 'rb') as fin:
names=None, format=None, nrows=None, skiprows=None, usecols=None, low_memory=False, converters=None,
filters=None, sample_count=None, worker_count=None, multiprocessing_method='forkserver') -> DataFrame:
if isinstance(path, list):
return load_multipart(
paths=path, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep,
dtype=dtype, error_bad_lines=error_bad_lines, header=header, names=names, format=format,
nrows=nrows, skiprows=skiprows, usecols=usecols, low_memory=low_memory, converters=converters,
filters=filters,
worker_count=worker_count,
multiprocessing_method=multiprocessing_method
)
if format is not None:
pass
elif path.endswith(save_pointer.POINTER_SUFFIX):
format = 'pointer'
elif path[-1] == '/' and s3_utils.is_s3_url(path): # and path[:2] == 's3'
format = 'multipart_s3'
elif path[-1] == '/' and not s3_utils.is_s3_url(path): # and path[:2] != 's3'
format = 'multipart_local'
elif '.parquet' in path or path[-1] == '/':
format = 'parquet'
else:
format = 'csv'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
return load(path=content_path, delimiter=delimiter, encoding=encoding, columns_to_keep=columns_to_keep, dtype=dtype,
error_bad_lines=error_bad_lines, header=header, names=names, format=None, nrows=nrows, skiprows=skiprows,
usecols=usecols, low_memory=low_memory, converters=converters, filters=filters, sample_count=sample_count,
worker_count=worker_count, multiprocessing_method=multiprocessing_method)
elif format == 'multipart_s3':
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(path)
def save(path, df, index=False, verbose=True, type=None, sep=',', compression='gzip', header=True, json_dump_columns=None):
if json_dump_columns is not None:
df = df.copy()
for column in json_dump_columns:
if column in df.columns.values:
df[column] = [json.dumps(x[0]) for x in zip(df[column])]
if type is None:
if path[-1] == '/' and s3_utils.is_s3_url(path): # and path[:2] == 's3'
type = 'multipart_s3'
elif path[-1] == '/' and not s3_utils.is_s3_url(path): # and path[:2] != 's3'
type = 'multipart_local'
elif '.csv' in path:
type = 'csv'
elif '.parquet' in path:
type = 'parquet'
else:
type = 'csv'
if 's3' not in path[:2]:
is_local = True
else:
is_local = False
column_count = len(list(df.columns.values))
row_count = df.shape[0]
if is_local:
os.makedirs(os.path.dirname(path), exist_ok=True)
if type == 'csv':