Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fmd=None,
compression=None,
**kwargs
):
fmd = copy.copy(fmd)
if not len(df):
# Write nothing for empty partitions
rgs = []
elif partition_on:
mkdirs = lambda x: fs.mkdirs(x, exist_ok=True)
if LooseVersion(fastparquet.__version__) >= "0.1.4":
rgs = partition_on_columns(
df, partition_on, path, filename, fmd, compression, fs.open, mkdirs
)
else:
rgs = partition_on_columns(
df,
partition_on,
path,
filename,
fmd,
fs.sep,
compression,
fs.open,
mkdirs,
)
else:
with fs.open(fs.sep.join([path, filename]), "wb") as fil:
fmd.num_rows = len(df)
rg = make_part_file(
fil, df, fmd.schema, compression=compression, fmd=fmd
)
import fastparquet
# Fastparquet mutates this in a non-threadsafe manner. For now we just copy
# it before forwarding to fastparquet.
fmd = copy.copy(fmd)
if not len(df):
# Write nothing for empty partitions
rgs = None
elif partition_on:
mkdirs = lambda x: fs.mkdirs(x, exist_ok=True)
if LooseVersion(fastparquet.__version__) >= "0.1.4":
rgs = partition_on_columns(
df, partition_on, path, filename, fmd, compression, fs.open, mkdirs
)
else:
rgs = partition_on_columns(
df,
partition_on,
path,
filename,
fmd,
fs.sep,
compression,
fs.open,
mkdirs,
)
else:
# Fastparquet current doesn't properly set `num_rows` in the output
# metadata. Set it here to fix that.
fmd.num_rows = len(df)
with fs.open(fs.sep.join([path, filename]), "wb") as fil:
rgs = make_part_file(fil, df, fmd.schema, compression=compression, fmd=fmd)
def _write_partition_fastparquet(
df, fs, path, filename, fmd, compression, partition_on
):
from fastparquet.writer import partition_on_columns, make_part_file
import fastparquet
# Fastparquet mutates this in a non-threadsafe manner. For now we just copy
# it before forwarding to fastparquet.
fmd = copy.copy(fmd)
if not len(df):
# Write nothing for empty partitions
rgs = None
elif partition_on:
mkdirs = lambda x: fs.mkdirs(x, exist_ok=True)
if LooseVersion(fastparquet.__version__) >= "0.1.4":
rgs = partition_on_columns(
df, partition_on, path, filename, fmd, compression, fs.open, mkdirs
)
else:
rgs = partition_on_columns(
df,
partition_on,
path,
filename,
fmd,
fs.sep,
compression,
fs.open,
mkdirs,
)
else:
# Fastparquet current doesn't properly set `num_rows` in the output
fs,
filename,
partition_on,
return_metadata,
fmd=None,
compression=None,
**kwargs
):
fmd = copy.copy(fmd)
if not len(df):
# Write nothing for empty partitions
rgs = []
elif partition_on:
mkdirs = lambda x: fs.mkdirs(x, exist_ok=True)
if LooseVersion(fastparquet.__version__) >= "0.1.4":
rgs = partition_on_columns(
df, partition_on, path, filename, fmd, compression, fs.open, mkdirs
)
else:
rgs = partition_on_columns(
df,
partition_on,
path,
filename,
fmd,
fs.sep,
compression,
fs.open,
mkdirs,
)
else:
with fs.open(fs.sep.join([path, filename]), "wb") as fil: