Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not isinstance(schema, Unischema):
raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
unischema_class)
else:
try:
schema = get_schema(dataset)
except ValueError:
raise ValueError('Unischema class could not be located in existing dataset,'
' please specify it')
# In order to be backwards compatible, we retrieve the common metadata from the dataset before
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None
with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
# We use the java ParquetOutputCommitter to write the metadata file for the existing dataset
# which will read all the footers of the dataset in parallel and merge them.
hadoop_config = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))
spark.stop()
if use_summary_metadata and arrow_metadata:
# When calling writeMetaDataFile it will overwrite the _common_metadata file which could have schema information
# or row group indexers. Therefore we want to retain this information and will add it to the new
spark = session_builder.getOrCreate()
# Get training and test data
if mnist_data is None:
mnist_data = {
'train': download_mnist_data(download_dir, train=True),
'test': download_mnist_data(download_dir, train=False)
}
# The MNIST data is small enough to do everything here in Python
for dset, data in mnist_data.items():
dset_output_url = '{}/{}'.format(output_url, dset)
# Using row_group_size_mb=1 to avoid having just a single rowgroup in this example. In a real store, the value
# should be similar to an HDFS block size.
with materialize_dataset(spark, dset_output_url, MnistSchema, row_group_size_mb=1):
# List of [(idx, image, digit), ...]
# where image is shaped as a 28x28 numpy matrix
idx_image_digit_list = map(lambda idx_image_digit: {
MnistSchema.idx.name: idx_image_digit[0],
MnistSchema.digit.name: idx_image_digit[1][1],
MnistSchema.image.name: np.array(list(idx_image_digit[1][0].getdata()), dtype=np.uint8).reshape(28, 28)
}, enumerate(data))
# Convert to pyspark.sql.Row
sql_rows = map(lambda r: dict_to_spark_row(MnistSchema, r), idx_image_digit_list)
# Write out the result
spark.createDataFrame(sql_rows, MnistSchema.as_spark_schema()) \
.coalesce(parquet_files_count) \
.write \
.option('compression', 'none') \
schema = get_schema_from_dataset_url(source_url, hdfs_driver=hdfs_driver)
fields = match_unischema_fields(schema, field_regex)
if field_regex and not fields:
field_names = list(schema.fields.keys())
raise ValueError('Regular expressions (%s) do not match any fields (%s)', str(field_regex), str(field_names))
if fields:
subschema = schema.create_schema_view(fields)
else:
subschema = schema
resolver = FilesystemResolver(target_url, spark.sparkContext._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark.sparkContext.sparkUser())
with materialize_dataset(spark, target_url, subschema, row_group_size_mb,
filesystem_factory=resolver.filesystem_factory()):
data_frame = spark.read \
.parquet(source_url)
if fields:
data_frame = data_frame.select(*[f.name for f in fields])
if not_null_fields:
not_null_condition = reduce(operator.__and__, (data_frame[f].isNotNull() for f in not_null_fields))
data_frame = data_frame.filter(not_null_condition)
if partitions_count:
data_frame = data_frame.repartition(partitions_count)
data_frame.write \
.mode('overwrite' if overwrite_output else 'error') \
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)