Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_with_fn(path, object, pickle_fn, format=None, verbose=True):
if verbose:
logger.log(15, 'Saving '+str(path))
if s3_utils.is_s3_url(path):
format = 's3'
if format == 's3':
save_s3(path, object, pickle_fn, verbose=verbose)
else:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as fout:
pickle_fn(object, fout)
def save(path, df, index=False, verbose=True, type=None, sep=',', compression='gzip', header=True, json_dump_columns=None):
if json_dump_columns is not None:
df = df.copy()
for column in json_dump_columns:
if column in df.columns.values:
df[column] = [json.dumps(x[0]) for x in zip(df[column])]
if type is None:
if path[-1] == '/' and s3_utils.is_s3_url(path): # and path[:2] == 's3'
type = 'multipart_s3'
elif path[-1] == '/' and not s3_utils.is_s3_url(path): # and path[:2] != 's3'
type = 'multipart_local'
elif '.csv' in path:
type = 'csv'
elif '.parquet' in path:
type = 'parquet'
else:
type = 'csv'
if 's3' not in path[:2]:
is_local = True
else:
is_local = False
column_count = len(list(df.columns.values))
row_count = df.shape[0]
if is_local:
buffer = StringIO()
df.to_csv(buffer, index=index, sep=sep, header=header)
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, prefix).put(Body=buffer.getvalue(), ACL='bucket-owner-full-control')
if verbose:
logger.log(15, "Saved " +str(path)+" | Columns = "+str(column_count)+" | Rows = "+str(row_count))
elif type == 'parquet':
try:
df.to_parquet(path, compression=compression, engine='fastparquet') # TODO: Might be slower than pyarrow in multiprocessing
except:
df.to_parquet(path, compression=compression, engine='pyarrow')
if verbose:
logger.log(15, "Saved "+str(path)+" | Columns = "+str(column_count)+" | Rows = "+str(row_count))
elif type == 'multipart_s3':
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3_utils.delete_s3_prefix(bucket=bucket, prefix=prefix) # TODO: Might only delete the first 1000!
save_multipart(path=path, df=df, index=index, verbose=verbose, type='parquet', sep=sep, compression=compression, header=header, json_dump_columns=None)
elif type == 'multipart_local':
if os.path.isdir(path):
for file in os.listdir(path):
file_path = os.path.join(path, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.exception(e)
save_multipart(path=path, df=df, index=index, verbose=verbose, type='parquet', sep=sep, compression=compression, header=header, json_dump_columns=None)
else:
raise Exception('Unknown save type: ' + type)
def save_s3(path: str, obj, pickle_fn, verbose=True):
if verbose:
logger.info(f'save object to {path}')
with tempfile.TemporaryFile() as f:
pickle_fn(obj, f)
f.flush()
f.seek(0)
bucket, key = s3_utils.s3_path_to_bucket_prefix(path)
s3_client = boto3.client('s3')
try:
config = boto3.s3.transfer.TransferConfig() # enable multipart uploading for files larger than 8MB
response = s3_client.upload_fileobj(f, bucket, key, Config=config)
except:
logger.exception('Failed to save object to s3')
raise
def load(path, format=None, verbose=True):
if path.endswith('.pointer'):
format = 'pointer'
elif s3_utils.is_s3_url(path):
format = 's3'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
if content_path == path:
raise RecursionError('content_path == path! : ' + str(path))
return load(path=content_path)
elif format == 's3':
if verbose: logger.log(15, 'Loading: %s' % path)
s3_bucket, s3_prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3 = boto3.resource('s3')
return pickle.loads(s3.Bucket(s3_bucket).Object(s3_prefix).get()['Body'].read())
if verbose: logger.log(15, 'Loading: %s' % path)
with open(path, 'rb') as fin:
object = pickle.load(fin)
return object
def get_pointer_content(path, verbose=True):
if s3_utils.is_s3_url(path):
bucket, key = s3_utils.s3_path_to_bucket_prefix(path)
s3 = boto3.resource('s3')
obj = s3.Object(bucket, key)
content_path = obj.get()['Body'].read().decode('utf-8')
else:
os.makedirs(os.path.dirname(path), exist_ok=True)
f = open(path, "r")
content_path = f.read()
f.close()
if verbose:
logger.log(15, 'Loaded pointer file '+str(path)+' pointing to '+str(content_path))
return content_path
def load_multipart_s3(bucket, prefix, columns_to_keep=None, dtype=None, sample_count=None):
files = list_bucket_prefix_s3(bucket, prefix)
files_cleaned = [file for file in files if prefix + '/part-' in file]
paths_full = [s3_utils.s3_bucket_prefix_to_path(bucket=bucket, prefix=file, version='s3') for file in files_cleaned]
if sample_count is not None:
logger.log(15, 'Taking sample of '+str(sample_count)+' of '+str(len(paths_full))+' s3 files to load')
paths_full = paths_full[:sample_count]
df = load_pd.load(path=paths_full, columns_to_keep=columns_to_keep, dtype=dtype)
return df
def load_multipart_s3(bucket, prefix, columns_to_keep=None, dtype=None, sample_count=None, filters=None, worker_count=None, multiprocessing_method='forkserver'):
if prefix[-1] == '/':
prefix = prefix[:-1]
files = list_bucket_prefix_suffix_s3(bucket=bucket, prefix=prefix, suffix='/part-')
files_cleaned = [file for file in files if prefix + '/part-' in file]
paths_full = [s3_utils.s3_bucket_prefix_to_path(bucket=bucket, prefix=file, version='s3') for file in files_cleaned]
if sample_count is not None:
logger.log(15, 'Load multipart s3 taking sample of '+str(sample_count)+' out of '+str(len(paths_full))+' files to load')
paths_full = paths_full[:sample_count]
df = load(path=paths_full, columns_to_keep=columns_to_keep, dtype=dtype, filters=filters,
worker_count=worker_count, multiprocessing_method=multiprocessing_method)
return df
def load_with_fn(path, pickle_fn, format=None, verbose=True):
if path.endswith('.pointer'):
format = 'pointer'
elif s3_utils.is_s3_url(path):
format = 's3'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
if content_path == path:
raise RecursionError('content_path == path! : ' + str(path))
return load_with_fn(content_path, pickle_fn)
elif format == 's3':
if verbose: logger.log(15, 'Loading: %s' % path)
s3_bucket, s3_prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3 = boto3.resource('s3')
# Has to be wrapped in IO buffer since s3 stream does not implement seek()
buff = io.BytesIO(s3.Bucket(s3_bucket).Object(s3_prefix).get()['Body'].read())
return pickle_fn(buff)
if verbose: logger.log(15, 'Loading: %s' % path)
with open(path, 'rb') as fin:
object = pickle_fn(fin)
return object
def load(path, format=None, verbose=True):
if path.endswith('.pointer'):
format = 'pointer'
elif s3_utils.is_s3_url(path):
format = 's3'
if format == 'pointer':
content_path = load_pointer.get_pointer_content(path)
if content_path == path:
raise RecursionError('content_path == path! : ' + str(path))
return load(path=content_path)
elif format == 's3':
if verbose: logger.log(15, 'Loading: %s' % path)
s3_bucket, s3_prefix = s3_utils.s3_path_to_bucket_prefix(s3_path=path)
s3 = boto3.resource('s3')
return pickle.loads(s3.Bucket(s3_bucket).Object(s3_prefix).get()['Body'].read())
if verbose: logger.log(15, 'Loading: %s' % path)
with open(path, 'rb') as fin:
object = pickle.load(fin)
return object