Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from functools import partial
import numpy as np
from pyspark.sql.types import LongType
from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
_ShuffleAnalysisSchema = Unischema('_ShuffleAnalysisSchema',
[UnischemaField('id', np.int64, (), ScalarCodec(LongType()), False)])
def generate_shuffle_analysis_dataset(spark, output_dataset_url, num_rows=1000, row_group_size=100):
"""
Generates a small dataset useful for doing analysis on shuffling algorithms
:param spark: spark session
:param output_dataset_url: location to write dataset
:param num_rows: how many rows should the dataset include
:param row_group_size: how many rows in each row group (there is a minimum of 5)
:return:
"""
spark_context = spark.sparkContext
with materialize_dataset(spark, output_dataset_url, _ShuffleAnalysisSchema):
rows_rdd = spark_context.parallelize(range(num_rows), numSlices=50) \
raise ValueError('Elements of "fields" must be either a string (regular expressions) or '
'an instance of UnischemaField class.')
# For fields that are specified as instances of Unischema: make sure that this schema contains fields
# with these names.
exact_field_names = [field.name for field in unischema_field_objects]
unknown_field_names = set(exact_field_names) - set(self.fields.keys())
if unknown_field_names:
raise ValueError('field {} does not belong to the schema {}'.format(unknown_field_names, self))
# Do not use instances of Unischema fields passed as an argument as it could contain codec/shape
# info that is different from the one stored in this schema object
exact_fields = [self._fields[name] for name in exact_field_names]
view_fields = exact_fields + match_unischema_fields(self, regex_patterns)
return Unischema('{}_view'.format(self._name), view_fields)
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from pyspark.sql.types import StringType
from petastorm.codecs import ScalarCodec, CompressedImageCodec
from petastorm.unischema import Unischema, UnischemaField
ImagenetSchema = Unischema('ImagenetSchema', [
UnischemaField('noun_id', np.string_, (), ScalarCodec(StringType()), False),
UnischemaField('text', np.string_, (), ScalarCodec(StringType()), False),
UnischemaField('image', np.uint8, (None, None, 3), CompressedImageCodec('png'), False),
])
if isinstance(field_type, ListType):
if isinstance(field_type.value_type, ListType) or isinstance(field_type.value_type, pyStructType):
warnings.warn('[ARROW-1644] Ignoring unsupported structure %r for field %r'
% (field_type, column_name))
continue
try:
np_type = _numpy_and_codec_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
% (column_name, field_type))
continue
else:
raise
unischema_fields.append(UnischemaField(column_name, np_type, (), None, arrow_field.nullable))
return Unischema('inferred_schema', unischema_fields)
"""
removed_fields = set(transform_spec.removed_fields)
unknown_field_names = removed_fields - set(schema.fields.keys())
if unknown_field_names:
warnings.warn('remove_fields specified some field names that are not part of the schema. '
'These field names will be ignored "{}". '.format(', '.join(unknown_field_names)))
exclude_fields = {f[0] for f in transform_spec.edit_fields} | removed_fields
fields = [v for k, v in schema.fields.items() if k not in exclude_fields]
for field_to_edit in transform_spec.edit_fields:
edited_unischema_field = UnischemaField(name=field_to_edit[0], numpy_dtype=field_to_edit[1],
shape=field_to_edit[2], codec=None, nullable=field_to_edit[3])
fields.append(edited_unischema_field)
return Unischema(schema._name + '_transformed', fields)
# Since UnischemaField is a tuple, we check against it since it is invariant to
# pickling
unischema_field_objects = [f for f in fields if isinstance(f, tuple)]
if len(unischema_field_objects) + len(regex_patterns) != len(fields):
raise ValueError('Elements of "fields" must be either a string (regular expressions) or '
'an instance of UnischemaField class.')
view_fields = unischema_field_objects + match_unischema_fields(self, regex_patterns)
for field in unischema_field_objects:
# Comparing by field names. Prevoiusly was looking for `field not in self._fields.values()`, but it breaks
# due to faulty pickling: T223683
if field.name not in self._fields:
raise ValueError('field {} does not belong to the schema {}'.format(field, self))
return Unischema('{}_view'.format(self._name), view_fields)
if isinstance(field_type, ListType):
if isinstance(field_type.value_type, ListType) or isinstance(field_type.value_type, pyStructType):
warnings.warn('[ARROW-1644] Ignoring unsupported structure %r for field %r'
% (field_type, column_name))
continue
try:
codec, np_type = _numpy_and_codec_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
% (column_name, field_type))
continue
else:
raise
unischema_fields.append(UnischemaField(column_name, np_type, (), codec, arrow_field.nullable))
return Unischema('inferred_schema', unischema_fields)
"""
This is a minimal example of how to generate a petastorm dataset. Generates a
sample dataset with some random data.
"""
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
rowgroup_size_mb = 256
libhdfs (java through JNI) or libhdfs3 (C++)
:param user: String denoting username when connecting to HDFS
"""
sc = spark.sparkContext
resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
if unischema_class:
schema = locate(unischema_class)
if not isinstance(schema, Unischema):
raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
unischema_class)
else:
try:
schema = get_schema(dataset)
except ValueError:
raise ValueError('Unischema class could not be located in existing dataset,'
' please specify it')
# In order to be backwards compatible, we retrieve the common metadata from the dataset before
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None
with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
Verifies that the data confirms with unischema definition types and encodes the data using the codec specified
by the unischema.
The parameters are keywords to allow use of functools.partial.
:param unischema: an instance of Unischema object
:param row_dict: a dictionary where the keys match name of fields in the unischema.
:return: a single pyspark.Row object
"""
# Lazy loading pyspark to avoid creating pyspark dependency on data reading code path
# (currently works only with make_batch_reader)
import pyspark
assert isinstance(unischema, Unischema)
# Add null fields. Be careful not to mutate the input dictionary - that would be an unexpected side effect
copy_row_dict = copy.copy(row_dict)
insert_explicit_nulls(unischema, copy_row_dict)
if set(copy_row_dict.keys()) != set(unischema.fields.keys()):
raise ValueError('Dictionary fields \n{}\n do not match schema fields \n{}'.format(
'\n'.join(sorted(copy_row_dict.keys())), '\n'.join(unischema.fields.keys())))
encoded_dict = {}
for field_name, value in copy_row_dict.items():
schema_field = unischema.fields[field_name]
if value is None:
if not schema_field.nullable:
raise ValueError('Field {} is not "nullable", but got passes a None value')
if schema_field.codec:
encoded_dict[field_name] = schema_field.codec.encode(schema_field, value) if value is not None else None