Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
the following fields: `time_mean`, `samples_per_second`, `memory_info` and `cpu`
"""
if not reader_extra_args:
reader_extra_args = dict()
if spawn_new_process:
args = copy.deepcopy(locals())
args['spawn_new_process'] = False
executor = ProcessPoolExecutor(1)
future = executor.submit(reader_throughput, **args)
return future.result()
logger.info('Arguments: %s', locals())
if 'schema_fields' not in reader_extra_args:
unischema_fields = match_unischema_fields(get_schema_from_dataset_url(dataset_url), field_regex)
reader_extra_args['schema_fields'] = unischema_fields
logger.info('Fields used in the benchmark: %s', str(reader_extra_args['schema_fields']))
with make_reader(dataset_url,
num_epochs=None,
reader_pool_type=str(pool_type), workers_count=loaders_count, pyarrow_serialize=pyarrow_serialize,
**reader_extra_args) as reader:
if read_method == ReadMethod.PYTHON:
result = _time_warmup_and_work(reader, warmup_cycles_count, measure_cycles_count)
elif read_method == ReadMethod.TF:
result = _time_warmup_and_work_tf(reader, warmup_cycles_count, measure_cycles_count,
shuffling_queue_size, min_after_dequeue)
else:
raise RuntimeError('Unexpected reader_type value: %s', str(read_method))
:param source_url: A url of the dataset to be copied.
:param target_url: A url specifying location of the target dataset.
:param field_regex: A list of regex patterns. Only columns that match one of these patterns are copied to the new
dataset.
:param not_null_fields: A list of fields that must have non-NULL valus in the target dataset.
:param overwrite_output: If ``False`` and there is an existing path defined by ``target_url``, the operation will
fail.
:param partitions_count: If not ``None``, the dataset is repartitioned before write. Number of files in the target
Parquet store is defined by this parameter.
:param row_group_size_mb: The size of the rowgroup in the target dataset. Specified in megabytes.
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param user: String denoting username when connecting to HDFS. None implies login user.
:return: None
"""
schema = get_schema_from_dataset_url(source_url, hdfs_driver=hdfs_driver)
fields = match_unischema_fields(schema, field_regex)
if field_regex and not fields:
field_names = list(schema.fields.keys())
raise ValueError('Regular expressions (%s) do not match any fields (%s)', str(field_regex), str(field_names))
if fields:
subschema = schema.create_schema_view(fields)
else:
subschema = schema
resolver = FilesystemResolver(target_url, spark.sparkContext._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark.sparkContext.sparkUser())
with materialize_dataset(spark, target_url, subschema, row_group_size_mb,
filesystem_factory=resolver.filesystem_factory()):
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskCache(cache_location, cache_size_limit, cache_row_size_estimate, **cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
except PetastormMetadataError:
raise RuntimeError('Currently make_reader supports reading only Petastorm datasets. '
'To read from a non-Petastorm Parquet store use make_batch_reader')
if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count, results_queue_size)
elif reader_pool_type == 'process':
if pyarrow_serialize:
serializer = PyArrowSerializer()
else:
serializer = PickleSerializer()
reader_pool = ProcessPool(workers_count, serializer)
elif reader_pool_type == 'dummy':
reader_pool = DummyPool()
else:
raise ValueError('Unknown reader_pool_type: {}'.format(reader_pool_type))
def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3'):
"""
Retrieve a spark rdd for a given petastorm dataset
:param dataset_url: A string for the dataset url (e.g. hdfs:///path/to/dataset)
:param spark_session: A spark session
:param schema_fields: list of unischema fields to subset, or None to read all fields.
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: A rdd of dictionary records from the dataset
"""
schema = get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
dataset_url_parsed = urlparse(dataset_url)
resolver = FilesystemResolver(dataset_url_parsed, spark_session.sparkContext._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver)
dataset_df = spark_session.read.parquet(resolver.get_dataset_path())
if schema_fields is not None:
# If wanting a subset of fields, create the schema view and run a select on those fields
schema = schema.create_schema_view(schema_fields)
field_names = [field.name for field in schema_fields]
dataset_df = dataset_df.select(*field_names)
dataset_rows = dataset_df.rdd \
.map(lambda row: utils.decode_row(row.asDict(), schema)) \
.map(lambda record: schema.make_namedtuple(**record))
:param cache_size_limit: An int specifying the size limit of the cache in bytes
:param cache_row_size_estimate: An int specifying the estimated size of a row in the dataset
:param cache_extra_settings: A dictionary of extra settings to pass to the cache implementation,
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param transform_spec: An instance of :class:`~petastorm.transform.TransformSpec` object defining how a record
is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends
on the ``reader_pool_type`` value).
:return: A :class:`Reader` object
"""
if dataset_url is None or not isinstance(dataset_url, six.string_types):
raise ValueError('dataset_url must be a string')
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
warnings.warn('Please use make_reader (instead of \'make_batch_dataset\' function to read this dataset. '
'You may get unexpected results. '
'Currently make_batch_reader supports reading only Parquet stores that contain '
'standard Parquet data types and do not require petastorm decoding.')
except PetastormMetadataError:
pass
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()