Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write_include_by_name():
csv2parquet.main_with_args(csv2parquet.convert, ['csvs/simple.csv', '--include', 'a'])
pqf = pq.ParquetFile('csvs/simple.parquet')
schema = pqf.schema
assert schema.names == ['a']
row_group = pqf.read_row_group(0)
assert row_group.num_rows == 3
col_a = row_group.column(0).to_pylist()
assert col_a == ['1', '2', '3']
def test_opt_invalid_ints():
csv2parquet.main_with_args(csv2parquet.convert,
['csvs/ints.csv', '--type',
'int8=int8?', 'int16=int16?', 'int32=int32?'])
pqf = pq.ParquetFile('csvs/ints.parquet')
schema = pqf.schema
assert schema.names == ['int8', 'int16', 'int32']
row_group = pqf.read_row_group(0)
assert row_group.num_rows == 2
int8 = row_group.column(0).to_pylist()
assert int8 == [1, None]
int16 = row_group.column(1).to_pylist()
assert int16 == [2, None]
int32 = row_group.column(2).to_pylist()
assert int32 == [3, None]
def test_write_from_tsv():
csv2parquet.main_with_args(csv2parquet.convert, ['csvs/simple2.tsv'])
pqf = pq.ParquetFile('csvs/simple2.parquet')
assert pqf.num_row_groups == 1
schema = pqf.schema
assert schema.names == ['a', 'b']
assert schema.column(0).logical_type.type == 'STRING'
assert schema.column(1).logical_type.type == 'STRING'
row_group = pqf.read_row_group(0)
assert row_group.num_rows == 1
col_a = row_group.column(0).to_pylist()
assert col_a == ['1']
col_b = row_group.column(1).to_pylist()
assert col_b == ['b']
if path == '':
msg = 'path to store data must be given on first use'
raise FileNotFoundError(msg)
conf = {'icd9': {'data_path': path}}
with open(Path.home() / '.medicare_utils.json', 'w') as f:
json.dump(conf, f)
self.conf = conf
Path(conf['icd9']['data_path']).mkdir(parents=True, exist_ok=True)
icd9_sg_path = Path(conf['icd9']['data_path']) / 'icd9_sg.parquet'
icd9_dx_path = Path(conf['icd9']['data_path']) / 'icd9_dx.parquet'
try:
pq.ParquetFile(icd9_sg_path)
pq.ParquetFile(icd9_dx_path)
except:
self._download(icd9_sg_path=icd9_sg_path, icd9_dx_path=icd9_dx_path)
sg_cols = ['icd_prcdr_cd', 'year']
dx_cols = ['icd_dgns_cd', 'year']
if long:
sg_cols.append('desc_long')
dx_cols.append('desc_long')
else:
sg_cols.append('desc_short')
dx_cols.append('desc_short')
sg = pd.read_parquet(icd9_sg_path, engine='pyarrow', columns=sg_cols)
dx = pd.read_parquet(icd9_dx_path, engine='pyarrow', columns=dx_cols)
Args:
file_ - file-like object opened in binary mode (+b)
Returns:
dict
body - summary of main contents (if applicable)
info - metdata for user consumption
"""
# TODO: generalize to datasets, multipart files
# As written, only works for single files, and metadata
# is slanted towards the first row_group
# local import reduces amortized latency, saves memory
import pyarrow.parquet as pq
pf = pq.ParquetFile(file_)
meta = pf.metadata
info = {}
info['created_by'] = meta.created_by
info['format_version'] = meta.format_version
info['metadata'] = {
k.decode(): v.decode()
for k, v in meta.metadata.items()
} if meta.metadata is not None else {}
info['num_row_groups'] = meta.num_row_groups
info['schema'] = {
name: {
'logical_type': meta.schema.column(i).logical_type.type,
'max_definition_level': meta.schema.column(i).max_definition_level,
file (str):
File directory of all the parquet file to encode.
Parquet file must have more than one row group.
fs (feast.feature_set.FeatureSet):
FeatureSet describing parquet files.
row_group_idx(int):
Row group index to read and encode into byte like FeatureRow
protobuf objects.
Returns:
List[bytes]:
List of byte encoded FeatureRows from the parquet file.
"""
pq_file = pq.ParquetFile(file)
# Read parquet file as a PyArrow table
table = pq_file.read_row_group(row_group_idx)
# Add datetime column
datetime_col = pa_column_to_timestamp_proto_column(table.column(DATETIME_COLUMN))
# Preprocess the columns by converting all its values to Proto values
proto_columns = {
field_name: pa_column_to_proto_column(field.dtype, table.column(field_name))
for field_name, field in fs.fields.items()
}
feature_set = f"{fs.project}/{fs.name}:{fs.version}"
# List to store result
feature_rows = []
def get_schema(parquet_file):
r = pq.ParquetFile(parquet_file)
return r.schema
None
"""
if isinstance(feature_set, FeatureSet):
name = feature_set.name
if version is None:
version = feature_set.version
elif isinstance(feature_set, str):
name = feature_set
else:
raise Exception(f"Feature set name must be provided")
# Read table and get row count
dir_path, dest_path = _read_table_from_source(source, chunk_size, max_workers)
pq_file = pq.ParquetFile(dest_path)
row_count = pq_file.metadata.num_rows
# Update the feature set based on PyArrow table of first row group
if force_update:
feature_set.infer_fields_from_pa(
table=pq_file.read_row_group(0),
discard_unused_fields=True,
replace_existing_features=True,
)
self.apply(feature_set)
current_time = time.time()
print("Waiting for feature set to be ready for ingestion...")
while True:
if timeout is not None and time.time() - current_time >= timeout:
def split_points_unclaimed(stop_position):
if next_block_start >= stop_position:
# Next block starts at or after the suggested stop position. Hence
# there will not be split points to be claimed for the range ending at
# suggested stop position.
return 0
return RangeTracker.SPLIT_POINTS_UNKNOWN
range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
start_offset = range_tracker.start_position()
if start_offset is None:
start_offset = 0
with self.open_file(file_name) as f:
pf = pq.ParquetFile(f)
# find the first dictionary page (or data page if there's no dictionary
# page available) offset after the given start_offset. This offset is also
# the starting offset of any row group since the Parquet specification
# describes that the data pages always come first before the meta data in
# each row group.
index = _ParquetUtils.find_first_row_group_index(pf, start_offset)
if index != -1:
next_block_start = _ParquetUtils.get_offset(pf, index)
else:
next_block_start = range_tracker.stop_position()
number_of_row_groups = _ParquetUtils.get_number_of_row_groups(pf)
while range_tracker.try_claim(next_block_start):
table = pf.read_row_group(index, self._columns)
toload_regex.append(r'^(buyin\d{2})$')
if hmo_val is not None:
toload_regex.append(r'^(hmoind\d{2})$')
if self.year_type == 'age':
toload_regex.append(r'^(bene_dob)$')
for keep_var in keep_vars:
if isinstance(keep_var, str):
toload_regex.append(r'^({})$'.format(keep_var))
toload_regex = re.compile('|'.join(toload_regex)).search
toload_vars: Dict[int, List[str]] = {}
for year in self.years:
if self.parquet_engine == 'pyarrow':
try:
pf = pq.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
except pa.ArrowIOError:
pf = pq.ParquetDataset(self._fpath(self.percent, year, 'bsfab'))
cols = pf.schema.names
elif self.parquet_engine == 'fastparquet':
pf = fp.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
cols = pf.columns
toload_vars[year] = [x for x in cols if toload_regex(x)]
for keep_var in keep_vars:
if isinstance(keep_var, re._pattern_type):
toload_vars[year].extend([
x for x in cols if keep_var.search(x)])
# Deduplicate while preserving order
toload_vars[year] = list(dict.fromkeys(toload_vars[year]))