Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
:param row_group_size_mb: The parquet row group size to use for your dataset
:param use_summary_metadata: Whether to use the parquet summary metadata for row group indexing or a custom
indexing method. The custom indexing method is more scalable for very large datasets.
:param filesystem_factory: A filesystem factory function to be used when saving Petastorm specific metadata to the
Parquet store.
"""
spark_config = {}
_init_spark(spark, spark_config, row_group_size_mb, use_summary_metadata)
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if filesystem_factory is None:
resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
user=spark.sparkContext.sparkUser())
filesystem_factory = resolver.filesystem_factory()
dataset_path = resolver.get_dataset_path()
else:
dataset_path = urlparse(dataset_url).path
filesystem = filesystem_factory()
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
_generate_unischema_metadata(dataset, schema)
if not use_summary_metadata:
_generate_num_row_groups_per_file(dataset, spark.sparkContext, filesystem_factory)
:param cache_extra_settings: A dictionary of extra settings to pass to the cache implementation,
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param transform_spec: An instance of :class:`~petastorm.transform.TransformSpec` object defining how a record
is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends
on the ``reader_pool_type`` value).
:return: A :class:`Reader` object
"""
if dataset_url is None or not isinstance(dataset_url, six.string_types):
raise ValueError('dataset_url must be a string')
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskCache(cache_location, cache_size_limit, cache_row_size_estimate, **cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
except PetastormMetadataError:
raise RuntimeError('Currently make_reader supports reading only Petastorm datasets. '
'To read from a non-Petastorm Parquet store use make_batch_reader')
def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_driver='libhdfs3'):
"""
Function build indexes for dataset piece described in piece_info
:param piece_info: description of dataset piece
:param dataset_url: dataset location
:param partitions: dataset partitions
:param indexers: list of indexer objects
:param schema: dataset schema
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: list of indexers containing index data
"""
# Resolver in executor context will get hadoop config from environment
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
fs = resolver.filesystem()
# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)
# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
# Decode columns values
def get_schema_from_dataset_url(dataset_url, hdfs_driver='libhdfs3'):
"""Returns a :class:`petastorm.unischema.Unischema` object loaded from a dataset specified by a url.
:param dataset_url: A dataset URL
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: A :class:`petastorm.unischema.Unischema` object
"""
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
# Get a unischema stored in the dataset metadata.
stored_schema = get_schema(dataset)
return stored_schema
# Can not rely on a check in epochs.py since it runs on a separate thread. Inform user earlier about invalid
# argument value.
if num_epochs is not None and (not isinstance(num_epochs, int) or num_epochs < 1):
raise ValueError('iterations must be positive integer or None')
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
# 1. Resolve dataset path (hdfs://, file://) and open the parquet storage (dataset)
logger.debug('dataset_url: %s', dataset_url)
if pyarrow_filesystem is not None:
filesystem = pyarrow_filesystem
dataset_path = urlparse(dataset_url).path
else:
resolver = FilesystemResolver(dataset_url)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
self._dataset = pq.ParquetDataset(dataset_path, filesystem=filesystem, validate_schema=False)
stored_schema = infer_or_load_unischema(self._dataset)
if isinstance(schema_fields, NGram):
self.ngram = schema_fields
self.ngram.resolve_regex_field_names(stored_schema)
else:
self.ngram = None
if self.ngram and not self.ngram.timestamp_overlap and shuffle_row_drop_partitions > 1:
raise NotImplementedError('Using timestamp_overlap=False is not implemented with'
' shuffle_options.shuffle_row_drop_partitions > 1')
hdfs_driver='libhdfs3'):
"""
Generates metadata necessary to read a petastorm dataset to an existing dataset.
:param spark: spark session
:param dataset_url: url of existing dataset
:param unischema_class: (optional) fully qualified dataset unischema class. If not specified will attempt
to find one already in the dataset. (e.g.
:class:`examples.hello_world.generate_hello_world_dataset.HelloWorldSchema`)
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param user: String denoting username when connecting to HDFS
"""
sc = spark.sparkContext
resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
if unischema_class:
schema = locate(unischema_class)
if not isinstance(schema, Unischema):
raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
unischema_class)
else:
try:
schema = get_schema(dataset)
help='Display list of row group indexes')
parser.add_argument('--print-values', action='store_true',
help='Print index values (dataset piece indexes)')
parser.add_argument('--skip-index', nargs='+', type=str,
help='Donot display indexed values for given fields')
parser.add_argument('--hdfs-driver', type=str, default='libhdfs3',
help='A string denoting the hdfs driver to use (if using a dataset on hdfs). '
'Current choices are libhdfs (java through JNI) or libhdfs3 (C++)')
args = parser.parse_args()
if args.dataset_url and args.dataset_url[-1] == '/':
args.dataset_url = args.dataset_url[:-1]
# Create pyarrow file system
resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver)
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
print_all = not args.schema and not args.index
if args.schema or print_all:
print('*** Schema from dataset metadata ***')
print((dataset_metadata.get_schema(dataset)))
if args.index or print_all:
index_dict = rowgroup_indexing.get_row_group_indexes(dataset)
print('*** Row group indexes from dataset metadata ***')
for index_name in index_dict:
print(('Index: {}'.format(index_name)))
if args.skip_index is None or index_name not in args.skip_index:
for field_value in index_dict[index_name].indexed_values:
print(' -- {}({})'.format(field_value,
:return: None
"""
schema = get_schema_from_dataset_url(source_url, hdfs_driver=hdfs_driver)
fields = match_unischema_fields(schema, field_regex)
if field_regex and not fields:
field_names = list(schema.fields.keys())
raise ValueError('Regular expressions (%s) do not match any fields (%s)', str(field_regex), str(field_names))
if fields:
subschema = schema.create_schema_view(fields)
else:
subschema = schema
resolver = FilesystemResolver(target_url, spark.sparkContext._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark.sparkContext.sparkUser())
with materialize_dataset(spark, target_url, subschema, row_group_size_mb,
filesystem_factory=resolver.filesystem_factory()):
data_frame = spark.read \
.parquet(source_url)
if fields:
data_frame = data_frame.select(*[f.name for f in fields])
if not_null_fields:
not_null_condition = reduce(operator.__and__, (data_frame[f].isNotNull() for f in not_null_fields))
data_frame = data_frame.filter(not_null_condition)
if partitions_count:
data_frame = data_frame.repartition(partitions_count)
if dataset_url is None or not isinstance(dataset_url, six.string_types):
raise ValueError('dataset_url must be a string')
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
warnings.warn('Please use make_reader (instead of \'make_batch_dataset\' function to read this dataset. '
'You may get unexpected results. '
'Currently make_batch_reader supports reading only Parquet stores that contain '
'standard Parquet data types and do not require petastorm decoding.')
except PetastormMetadataError:
pass
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskArrowTableCache(cache_location, cache_size_limit, cache_row_size_estimate,
**cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count)
elif reader_pool_type == 'process':
serializer = ArrowTableSerializer()
reader_pool = ProcessPool(workers_count, serializer)