Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for keep_var in keep_vars:
if isinstance(keep_var, str):
toload_regex.append(r'^({})$'.format(keep_var))
toload_regex = re.compile('|'.join(toload_regex)).search
toload_vars: Dict[int, List[str]] = {}
for year in self.years:
if self.parquet_engine == 'pyarrow':
try:
pf = pq.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
except pa.ArrowIOError:
pf = pq.ParquetDataset(self._fpath(self.percent, year, 'bsfab'))
cols = pf.schema.names
elif self.parquet_engine == 'fastparquet':
pf = fp.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
cols = pf.columns
toload_vars[year] = [x for x in cols if toload_regex(x)]
for keep_var in keep_vars:
if isinstance(keep_var, re._pattern_type):
toload_vars[year].extend([
x for x in cols if keep_var.search(x)])
# Deduplicate while preserving order
toload_vars[year] = list(dict.fromkeys(toload_vars[year]))
# Check cols against keep_vars
# Is there an item in keep_vars that wasn't matched?
# NOTE need to check this against regex values of keep_vars
for var in keep_vars:
if [x for x in toload_vars[year] if re.search(var, x)] == []:
def append(bucket, key1, key2, s3, output_filename):
s3_open = s3.open
path1='{}{}'.format(bucket,key1)
pf1 = ParquetFile(path1, open_with=s3_open)
df1=pf1.to_pandas()
path2='{}{}'.format(bucket,key2)
pf2 = ParquetFile(path2, open_with=s3_open)
df2=pf2.to_pandas()
data = df1.append(df2)
pwrite('{}{}'.format(bucket,output_filename), data, open_with=s3_open, compression='GZIP', append=False, has_nulls=True)
verbose:
Print logging messages to console
Returns:
DataFrame with bene_id and bool columns for each code to search for
"""
# Determine which variables to extract
if self.parquet_engine == 'pyarrow':
try:
pf = pq.ParquetFile(self._fpath(self.percent, year, data_type))
except pa.ArrowIOError:
pf = pq.ParquetDataset(self._fpath(self.percent, year, data_type))
all_cols = pf.schema.names
elif self.parquet_engine == 'fastparquet':
pf = fp.ParquetFile(self._fpath(self.percent, year, data_type))
all_cols = pf.columns
regexes = {'hcpcs': r'^hcpcs_cd$', 'icd9_sg': r'^icd_prcdr_cd(\d+)$'}
if data_type == 'carl':
regexes['icd9_dx'] = r'icd_dgns_cd(\d*)$'
elif data_type == 'med':
regexes['icd9_dx'] = r'^dgnscd(\d+)$'
regexes['icd9_sg'] = r'^prcdrcd(\d+)$'
else:
regexes['icd9_dx'] = r'^icd_dgns_cd(\d+)$'
cols: Dict[str, List[str]] = {
'cl_id': [
x for x in all_cols
if re.search(r'^medparid$|^clm_id$|^claimindex$', x)],
'pl_id': ['ehic'] if year < 2006 else ['bene_id'],
def read_parq_pandas(filepath):
return fp.ParquetFile(filepath).to_pandas()
read["parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
def read_partition(fs, piece, columns, index, categories=(), pf=None, **kwargs):
if isinstance(index, list):
columns += index
if pf is None:
base, fns = _analyze_paths([piece], fs)
scheme = get_file_scheme(fns)
pf = ParquetFile(piece, open_with=fs.open)
relpath = piece.replace(base, "").lstrip("/")
for rg in pf.row_groups:
for ch in rg.columns:
ch.file_path = relpath
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
pf.fn = base
return pf.to_pandas(columns, categories, index=index)
else:
if isinstance(pf, tuple):
pf = _determine_pf_parts(fs, pf[0], pf[1], **kwargs)[1]
pf._dtypes = lambda *args: pf.dtypes # ugly patch, could be fixed
pf.fmd.row_groups = None
rg_piece = pf.row_groups[piece]
pf.fmd.key_value_metadata = None
return pf.read_row_group_file(
if dask:
pl = dd.read_parquet(
self._fpath(self.percent, year, 'bsfab'),
columns=[x for x in toload_vars if x != 'bene_id'],
index=['bene_id'],
engine=self.parquet_engine)
elif self.parquet_engine == 'pyarrow':
try:
pf = pq.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
except pa.ArrowIOError:
pf = pq.ParquetDataset(self._fpath(self.percent, year, 'bsfab'))
pl = pf.read(
columns=toload_vars).to_pandas().set_index('bene_id')
elif self.parquet_engine == 'fastparquet':
pf = fp.ParquetFile(self._fpath(self.percent, year, 'bsfab'))
pl = pf.to_pandas(columns=toload_vars, index='bene_id')
if not dask:
nobs = len(pl)
iemsg = 'Internal error: Missing column: '
iemsg2 = '\nPlease submit a bug report at\n'
iemsg2 += 'https://github.com/kylebarron/medicare_utils/issues/new'
if gender is not None:
assert 'sex' in pl.columns, iemsg + 'sex' + iemsg2
if pl['sex'].dtype.name == 'category':
if pl['sex'].dtype.categories.dtype == object:
var_type = 'string'
else:
var_type = 'numeric'
elif np.issubdtype(pl['sex'].dtype, np.number):
# Otherwise, just use 0th file
if "_common_metadata" in fns:
pf = ParquetFile(
base + fs.sep + "_common_metadata",
open_with=fs.open,
**kwargs.get("file", {})
)
else:
pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {}))
scheme = get_file_scheme(fns)
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
parts = paths.copy()
else:
# There is only one file to read
pf = ParquetFile(
paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {})
)
return parts, pf, gather_statistics, fast_metadata
index=cols['pl_id'])
elif self.parquet_engine == 'pyarrow':
try:
pf = pq.ParquetFile(path)
itr = (
pf.read_row_group(
i,
columns=cols_toload).to_pandas().set_index(
cols['pl_id'])
for i in range(pf.num_row_groups))
except pa.ArrowIOError:
pf = pq.ParquetDataset(path)
itr = (pf.read(columns=cols_toload).to_pandas().set_index(
cols['pl_id']) for i in [1])
elif self.parquet_engine == 'fastparquet':
pf = fp.ParquetFile(path)
itr = pf.iter_row_groups(columns=list(cols_toload), index=cols['pl_id'])
if dask:
cl = self._search_for_codes_df_inner(
cl=cl,
codes=codes,
cols=cols,
year=year,
keep_vars=keep_vars,
rename=rename,
collapse_codes=collapse_codes,
pl_ids_to_filter=pl_ids_to_filter)
else:
# This holds the df's from each iteration over the claim-level
# dataset
all_cl = []
paths = fs.glob(paths[0] + fs.sep + "*")
base, fns = _analyze_paths(paths, fs)
if "_metadata" in fns:
# Using _metadata file (best-case scenario)
pf = ParquetFile(
base + fs.sep + "_metadata",
open_with=fs.open,
sep=fs.sep,
**kwargs.get("file", {})
)
if gather_statistics is None:
gather_statistics = True
elif gather_statistics is not False:
# Scan every file
pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
fast_metadata = False
else:
# Use _common_metadata file if it is available.
# Otherwise, just use 0th file
if "_common_metadata" in fns:
pf = ParquetFile(
base + fs.sep + "_common_metadata",
open_with=fs.open,
**kwargs.get("file", {})
)
else:
pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {}))
scheme = get_file_scheme(fns)
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
parts = paths.copy()
open_with=fs.open,
sep=fs.sep,
**kwargs.get("file", {})
)
if gather_statistics is None:
gather_statistics = True
elif gather_statistics is not False:
# Scan every file
pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
fast_metadata = False
else:
# Use _common_metadata file if it is available.
# Otherwise, just use 0th file
if "_common_metadata" in fns:
pf = ParquetFile(
base + fs.sep + "_common_metadata",
open_with=fs.open,
**kwargs.get("file", {})
)
else:
pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {}))
scheme = get_file_scheme(fns)
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
parts = paths.copy()
else:
# There is only one file to read
pf = ParquetFile(
paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {})
)