Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_leak3():
import pyarrow.parquet as pq
df = pd.DataFrame({'a{0}'.format(i): [1, 2, 3, 4]
for i in range(50)})
table = pa.Table.from_pandas(df, preserve_index=False)
writer = pq.ParquetWriter('leak_test_' + tm.rands(5) + '.parquet',
table.schema)
def func():
writer.write_table(table, row_group_size=len(table))
# This does not "leak" per se but we do want to have this use as little
# memory as possible
assert_does_not_leak(func, iterations=500,
check_interval=50, tolerance=20)
# double check that all of the years you are asking for are actually in
_verify_cems_args(data_path, epacems_years, epacems_states)
for file in data_path.iterdir():
if "epacems" in file.name:
df_name = file.name[:file.name.find(".")]
year = df_name[25:29]
state = df_name[30:]
# only convert the years and states that you actually want
if int(year) in epacems_years and state.upper() in epacems_states:
df = pd.read_csv(file, parse_dates=['operating_datetime_utc'])
logger.info(
f"Converted {len(df)} records for {year} and {state}."
)
df = year_from_operating_datetime(df).astype(IN_DTYPES)
pq.write_to_dataset(
pa.Table.from_pandas(
df, preserve_index=False, schema=schema),
root_path=str(out_dir), partition_cols=list(partition_cols),
compression=compression)
def put(cls, obj):
"""Put an object in the Plasma store and wrap it in this object.
Args:
obj: The object to be put.
Returns:
A `RayRemotePartition` object.
"""
return PyarrowOnRayFramePartition(ray.put(pyarrow.Table.from_pandas(obj)))
def setup(self, n, dtype):
super(PandasConversionsFromArrow, self).setup(n, dtype)
self.arrow_data = pa.Table.from_pandas(self.data)
chunk_size (int):
Number of worker processes to use to encode values.
max_workers (int):
Amount of rows to load and ingest at a time.
Returns:
Tuple[str, str]:
Tuple containing parent directory path and destination path to
parquet file.
"""
# Pandas DataFrame detected
if isinstance(source, pd.DataFrame):
table = pa.Table.from_pandas(df=source)
# Inferring a string path
elif isinstance(source, str):
file_path = source
filename, file_ext = os.path.splitext(file_path)
if ".csv" in file_ext:
from pyarrow import csv
table = csv.read_csv(filename)
elif ".json" in file_ext:
from pyarrow import json
table = json.read_json(filename)
else:
table = pq.read_table(file_path)
def save_dataframe(self, dataframe):
"""
Save a DataFrame to the store.
"""
storepath = self.temporary_object_path(str(uuid.uuid4()))
# switch parquet lib
parqlib = self.get_parquet_lib()
if isinstance(dataframe, pd.DataFrame):
#parqlib is ParquetLib.ARROW: # other parquet libs are deprecated, remove?
import pyarrow as pa
from pyarrow import parquet
table = pa.Table.from_pandas(dataframe)
parquet.write_table(table, storepath)
elif parqlib is ParquetLib.SPARK:
from pyspark import sql as sparksql
assert isinstance(dataframe, sparksql.DataFrame)
dataframe.write.parquet(storepath)
else:
assert False, "Unimplemented ParquetLib %s" % parqlib
# Move serialized DataFrame to object store
if os.path.isdir(storepath): # Pyspark
hashes = []
files = [ofile for ofile in os.listdir(storepath) if ofile.endswith(".parquet")]
for obj in files:
path = os.path.join(storepath, obj)
objhash = digest_file(path)
self._move_to_store(path, objhash)
def _save(self, data: pd.DataFrame) -> None:
save_path = self._get_save_path()
table = pa.Table.from_pandas(data)
pq.write_table(
table=table, where=save_path, filesystem=self._gcs, **self._save_args
)
# gcs maintain cache of the directory,
# so invalidate to see new files
self.invalidate_cache()
def store(self, store, key_prefix, df):
key = "{}.parquet".format(key_prefix)
if isinstance(df, pa.Table):
table = df
else:
table = pa.Table.from_pandas(df)
buf = pa.BufferOutputStream()
if (
self.chunk_size
and self.chunk_size < len(table)
and not ARROW_LARGER_EQ_0150
):
table = _reset_dictionary_columns(table)
pq.write_table(
table,
buf,
version=self._PARQUET_VERSION,
chunk_size=self.chunk_size,
compression=self.compression,
coerce_timestamps="us",
)
store.put(key, buf.getvalue().to_pybytes())