Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_schema(dataset):
"""Retrieves schema object stored as part of dataset methadata.
:param dataset: an instance of :class:`pyarrow.parquet.ParquetDataset object`
:return: A :class:`petastorm.unischema.Unischema` object
"""
if not dataset.common_metadata:
raise PetastormMetadataError(
'Could not find _common_metadata file. Use materialize_dataset(..) in'
' petastorm.etl.dataset_metadata.py to generate this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
dataset_metadata_dict = dataset.common_metadata.metadata
# Read schema
if UNISCHEMA_KEY not in dataset_metadata_dict:
raise PetastormMetadataError(
'Could not find the unischema in the dataset common metadata file.'
' Please provide or generate dataset with the unischema attached.'
' Common Metadata file might not be generated properly.'
' Make sure to use materialize_dataset(..) in petastorm.etl.dataset_metadata to'
' properly generate this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
ser_schema = dataset_metadata_dict[UNISCHEMA_KEY]
"""
# We try to get row group information from metadata file
metadata = dataset.metadata
common_metadata = dataset.common_metadata
if not metadata and not common_metadata:
# If we are inferring the schema we allow reading the footers to get the row group information
return _split_row_groups_from_footers(dataset)
if metadata and metadata.num_row_groups > 0:
# If the metadata file exists and has row group information we use it to split the dataset pieces
return _split_row_groups(dataset)
# If we don't have row groups in the common metadata we look for the old way of loading it
dataset_metadata_dict = common_metadata.metadata
if ROW_GROUPS_PER_FILE_KEY not in dataset_metadata_dict:
raise PetastormMetadataError(
'Could not find row group metadata in _common_metadata file.'
' Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate'
' this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
metadata_dict_key = ROW_GROUPS_PER_FILE_KEY
row_groups_per_file = json.loads(dataset_metadata_dict[metadata_dict_key].decode())
rowgroups = []
# Force order of pieces. The order is not deterministic since it depends on multithreaded directory
# listing implementation inside pyarrow. We stabilize order here, this way we get reproducable order
# when pieces shuffling is off. This also enables implementing piece shuffling given a seed
sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
for piece in sorted_pieces:
# If we are not using absolute paths, we need to convert the path to a relative path for
# looking up the number of row groups.
row_groups_key = os.path.relpath(piece.path, dataset.paths)
"""Retrieves schema object stored as part of dataset methadata.
:param dataset: an instance of :class:`pyarrow.parquet.ParquetDataset object`
:return: A :class:`petastorm.unischema.Unischema` object
"""
if not dataset.common_metadata:
raise PetastormMetadataError(
'Could not find _common_metadata file. Use materialize_dataset(..) in'
' petastorm.etl.dataset_metadata.py to generate this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
dataset_metadata_dict = dataset.common_metadata.metadata
# Read schema
if UNISCHEMA_KEY not in dataset_metadata_dict:
raise PetastormMetadataError(
'Could not find the unischema in the dataset common metadata file.'
' Please provide or generate dataset with the unischema attached.'
' Common Metadata file might not be generated properly.'
' Make sure to use materialize_dataset(..) in petastorm.etl.dataset_metadata to'
' properly generate this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
ser_schema = dataset_metadata_dict[UNISCHEMA_KEY]
# Since we have moved the unischema class around few times, unpickling old schemas will not work. In this case we
# override the old import path to get backwards compatibility
schema = depickle_legacy_package_name_compatible(ser_schema)
return schema
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskCache(cache_location, cache_size_limit, cache_row_size_estimate, **cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
except PetastormMetadataError:
raise RuntimeError('Currently make_reader supports reading only Petastorm datasets. '
'To read from a non-Petastorm Parquet store use make_batch_reader')
if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count, results_queue_size)
elif reader_pool_type == 'process':
if pyarrow_serialize:
serializer = PyArrowSerializer()
else:
serializer = PickleSerializer()
reader_pool = ProcessPool(workers_count, serializer)
elif reader_pool_type == 'dummy':
reader_pool = DummyPool()
else:
raise ValueError('Unknown reader_pool_type: {}'.format(reader_pool_type))
def infer_or_load_unischema(dataset):
"""Try to recover Unischema object stored by ``materialize_dataset`` function. If it can be loaded, infer
Unischema from native Parquet schema"""
try:
return get_schema(dataset)
except PetastormMetadataError:
logger.info('Failed loading Unischema from metadata in %s. Assuming the dataset was not created with '
'Petastorm. Will try to construct from native Parquet schema.')
return Unischema.from_arrow_schema(dataset)