Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: None, upon successful completion the rowgroup predicates will be saved to _metadata file
"""
if dataset_url and dataset_url[-1] == '/':
dataset_url = dataset_url[:-1]
# Create pyarrow file system
resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
split_pieces = dataset_metadata.load_row_groups(dataset)
schema = dataset_metadata.get_schema(dataset)
# We need direct reference on partitions object
partitions = dataset.partitions
pieces_num = len(split_pieces)
piece_info_list = []
for piece_index in range(pieces_num):
# indexes relies on the ordering of the split dataset pieces.
# This relies on how the dataset pieces are split and sorted which although should not change,
# still might and we should make sure not to forget that could break this.
piece = split_pieces[piece_index]
piece_info_list.append(PieceInfo(piece_index, piece.path, piece.row_group, piece.partition_keys))
start_time = time.time()
piece_info_rdd = spark_context.parallelize(piece_info_list, min(len(piece_info_list), PARALLEL_SLICE_NUM))
indexer_rdd = piece_info_rdd.map(lambda piece_info: _index_columns(piece_info, dataset_url, partitions,
indexers, schema, hdfs_driver=hdfs_driver))
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
if unischema_class:
schema = locate(unischema_class)
if not isinstance(schema, Unischema):
raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
unischema_class)
else:
try:
schema = get_schema(dataset)
except ValueError:
raise ValueError('Unischema class could not be located in existing dataset,'
' please specify it')
# In order to be backwards compatible, we retrieve the common metadata from the dataset before
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None
with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
# We use the java ParquetOutputCommitter to write the metadata file for the existing dataset
# which will read all the footers of the dataset in parallel and merge them.
hadoop_config = sc._jsc.hadoopConfiguration()
'Current choices are libhdfs (java through JNI) or libhdfs3 (C++)')
args = parser.parse_args()
if args.dataset_url and args.dataset_url[-1] == '/':
args.dataset_url = args.dataset_url[:-1]
# Create pyarrow file system
resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver)
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
print_all = not args.schema and not args.index
if args.schema or print_all:
print('*** Schema from dataset metadata ***')
print((dataset_metadata.get_schema(dataset)))
if args.index or print_all:
index_dict = rowgroup_indexing.get_row_group_indexes(dataset)
print('*** Row group indexes from dataset metadata ***')
for index_name in index_dict:
print(('Index: {}'.format(index_name)))
if args.skip_index is None or index_name not in args.skip_index:
for field_value in index_dict[index_name].indexed_values:
print(' -- {}({})'.format(field_value,
len(index_dict[index_name].get_row_group_indexes(field_value))))
if args.print_values:
print(index_dict[index_name].get_row_group_indexes(field_value))
else:
print(' (skipped)')