Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
of partitions.
:return:
"""
if not self._dataset:
self._dataset = pq.ParquetDataset(
self._dataset_path,
filesystem=self._filesystem,
validate_schema=False)
piece = self._split_pieces[piece_index]
# Create pyarrow file system
parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
if not isinstance(self._local_cache, NullCache):
if worker_predicate:
raise RuntimeError('Local cache is not supported together with predicates, '
'unless the dataset is partitioned by the column the predicate operates on.')
if shuffle_row_drop_partition[1] != 1:
raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1')
if worker_predicate:
all_cols = self._load_rows_with_predicate(parquet_file, piece, worker_predicate, shuffle_row_drop_partition)
else:
# Using hash of the dataset path with the relative path in order to:
# 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts
# 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with
# some cache implementations
# 3. Still leave relative path and the piece_index in plain text to make it easier to debug
cache_key = '{}:{}:{}'.format(hashlib.md5(self._dataset_path.encode('utf-8')).hexdigest(),
piece.path, piece_index)
warnings.warn('Please use make_reader (instead of \'make_batch_dataset\' function to read this dataset. '
'You may get unexpected results. '
'Currently make_batch_reader supports reading only Parquet stores that contain '
'standard Parquet data types and do not require petastorm decoding.')
except PetastormMetadataError:
pass
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskArrowTableCache(cache_location, cache_size_limit, cache_row_size_estimate,
**cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count)
elif reader_pool_type == 'process':
serializer = ArrowTableSerializer()
reader_pool = ProcessPool(workers_count, serializer)
elif reader_pool_type == 'dummy':
reader_pool = DummyPool()
else:
raise ValueError('Unknown reader_pool_type: {}'.format(reader_pool_type))
self._dataset = pq.ParquetDataset(dataset_path, filesystem=filesystem, validate_schema=False)
stored_schema = infer_or_load_unischema(self._dataset)
if isinstance(schema_fields, NGram):
self.ngram = schema_fields
self.ngram.resolve_regex_field_names(stored_schema)
else:
self.ngram = None
if self.ngram and not self.ngram.timestamp_overlap and shuffle_row_drop_partitions > 1:
raise NotImplementedError('Using timestamp_overlap=False is not implemented with'
' shuffle_options.shuffle_row_drop_partitions > 1')
cache = cache or NullCache()
shuffle_row_drop_partitions = self._normalize_shuffle_options(shuffle_row_drop_partitions, self._dataset)
# Make a schema view (a view is a Unischema containing only a subset of fields
# Will raise an exception if invalid schema fields are in schema_fields
fields = schema_fields if isinstance(schema_fields, collections.Iterable) else None
self.schema = stored_schema.create_schema_view(fields) if fields else stored_schema
# 2. Get a list of all groups
row_groups = dataset_metadata.load_row_groups(self._dataset)
# 3. Filter rowgroups
filtered_row_groups, worker_predicate = self._filter_row_groups(self._dataset, row_groups, predicate,
rowgroup_selector, cur_shard,
shard_count)
of partitions.
:return:
"""
if not self._dataset:
self._dataset = pq.ParquetDataset(
self._dataset_path,
filesystem=self._filesystem,
validate_schema=False)
piece = self._split_pieces[piece_index]
# Create pyarrow file system
parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
if not isinstance(self._local_cache, NullCache):
if worker_predicate:
raise RuntimeError('Local cache is not supported together with predicates, '
'unless the dataset is partitioned by the column the predicate operates on.')
if shuffle_row_drop_partition[1] != 1:
raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1')
if worker_predicate:
all_cols = self._load_rows_with_predicate(parquet_file, piece, worker_predicate, shuffle_row_drop_partition)
else:
# Using hash of the dataset path with the relative path in order to:
# 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts
# 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with
# some cache implementations
# 3. Still leave relative path and the piece_index in plain text to make it easier to debug
cache_key = '{}:{}:{}'.format(hashlib.md5(self._dataset_path.encode('utf-8')).hexdigest(),
piece.path, piece_index)
if isinstance(schema_fields, NGram):
self.ngram = schema_fields
self.ngram.resolve_regex_field_names(stored_schema)
else:
self.ngram = None
# By default, use original method of working with list of dictionaries and not arrow tables
worker_class = worker_class or PyDictReaderWorker
self._results_queue_reader = worker_class.new_results_queue_reader()
if self.ngram and not self.ngram.timestamp_overlap and shuffle_row_drop_partitions > 1:
raise NotImplementedError('Using timestamp_overlap=False is not implemented with'
' shuffle_options.shuffle_row_drop_partitions > 1')
cache = cache or NullCache()
self._workers_pool = reader_pool or ThreadPool(10)
# Make a schema view (a view is a Unischema containing only a subset of fields
# Will raise an exception if invalid schema fields are in schema_fields
if self.ngram:
fields = self.ngram.get_field_names_at_all_timesteps()
else:
fields = schema_fields if isinstance(schema_fields, collections.Iterable) else None
storage_schema = stored_schema.create_schema_view(fields) if fields else stored_schema
if transform_spec:
self.schema = transform_schema(storage_schema, transform_spec)
else:
self.schema = storage_schema
If ngram not None was passed to the constructor, the function returns a dictionary structured according to
NGram definition.
:param rowgroup_spec: A dictionary containing the following fields: 'row_group': ParquetDatasetPiece object
describing a rowgroup to be loaded; 'shuffle_row_drop_partition' a tuple with
(this_partition, num_of_partitions)
:return: A dictionary indexed by field names, or a dictionary defined by NGram spec.
"""
piece = rowgroup_spec['row_group']
shuffle_row_drop_partition = rowgroup_spec['shuffle_row_drop_partition']
# Create pyarrow file system
with self._dataset.fs.open(piece.path) as piece_file_handle:
parquet_file = ParquetFile(piece_file_handle)
if not isinstance(self._local_cache, NullCache):
if self._worker_predicate:
raise RuntimeError('Local cache is not supported together with predicates, '
'unless the dataset is partitioned by the column the predicate operates on.')
if shuffle_row_drop_partition[1] != 1:
raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1')
if self._worker_predicate:
all_cols = self._load_rows_with_predicate(parquet_file, piece, self._worker_predicate,
shuffle_row_drop_partition)
else:
# Using hash of the dataset url with the relative path in order to:
# 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts
# 2. Dataset url is hashed, to make sure we don't create too long keys, which maybe incompatible with
# some cache implementations
# 3. Still leave relative path and the piece_index in plain text to make it easier to debug
cache_key = '{}:{}:{}'.format(
on the ``reader_pool_type`` value).
:return: A :class:`Reader` object
"""
if dataset_url is None or not isinstance(dataset_url, six.string_types):
raise ValueError('dataset_url must be a string')
dataset_url = dataset_url[:-1] if dataset_url[-1] == '/' else dataset_url
logger.debug('dataset_url: %s', dataset_url)
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
filesystem = resolver.filesystem()
dataset_path = resolver.get_dataset_path()
if cache_type is None or cache_type == 'null':
cache = NullCache()
elif cache_type == 'local-disk':
cache = LocalDiskCache(cache_location, cache_size_limit, cache_row_size_estimate, **cache_extra_settings or {})
else:
raise ValueError('Unknown cache_type: {}'.format(cache_type))
try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
except PetastormMetadataError:
raise RuntimeError('Currently make_reader supports reading only Petastorm datasets. '
'To read from a non-Petastorm Parquet store use make_batch_reader')
if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count, results_queue_size)
elif reader_pool_type == 'process':
if pyarrow_serialize:
serializer = PyArrowSerializer()