Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# which will read all the footers of the dataset in parallel and merge them.
hadoop_config = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))
spark.stop()
if use_summary_metadata and arrow_metadata:
# When calling writeMetaDataFile it will overwrite the _common_metadata file which could have schema information
# or row group indexers. Therefore we want to retain this information and will add it to the new
# _common_metadata file. If we were using the old legacy metadata method this file wont be deleted
base_schema = arrow_metadata.schema.to_arrow_schema()
metadata_dict = base_schema.metadata
if ROW_GROUPS_PER_FILE_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, metadata_dict[ROW_GROUPS_PER_FILE_KEY])
if ROWGROUPS_INDEX_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, metadata_dict[ROWGROUPS_INDEX_KEY])
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))
spark.stop()
if use_summary_metadata and arrow_metadata:
# When calling writeMetaDataFile it will overwrite the _common_metadata file which could have schema information
# or row group indexers. Therefore we want to retain this information and will add it to the new
# _common_metadata file. If we were using the old legacy metadata method this file wont be deleted
base_schema = arrow_metadata.schema.to_arrow_schema()
metadata_dict = base_schema.metadata
if ROW_GROUPS_PER_FILE_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, metadata_dict[ROW_GROUPS_PER_FILE_KEY])
if ROWGROUPS_INDEX_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, metadata_dict[ROWGROUPS_INDEX_KEY])
def _generate_unischema_metadata(dataset, schema):
"""
Generates the serialized unischema and adds it to the dataset parquet metadata to be used upon reading.
:param dataset: (ParquetDataset) Dataset to attach schema
:param schema: (Unischema) Schema to attach to dataset
:return: None
"""
# TODO(robbieg): Simply pickling unischema will break if the UnischemaField class is changed,
# or the codec classes are changed. We likely need something more robust.
assert schema
serialized_schema = pickle.dumps(schema)
utils.add_to_dataset_metadata(dataset, UNISCHEMA_KEY, serialized_schema)