Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from functools import partial
import numpy as np
from pyspark.sql.types import LongType
from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
_ShuffleAnalysisSchema = Unischema('_ShuffleAnalysisSchema',
[UnischemaField('id', np.int64, (), ScalarCodec(LongType()), False)])
def generate_shuffle_analysis_dataset(spark, output_dataset_url, num_rows=1000, row_group_size=100):
"""
Generates a small dataset useful for doing analysis on shuffling algorithms
:param spark: spark session
:param output_dataset_url: location to write dataset
:param num_rows: how many rows should the dataset include
:param row_group_size: how many rows in each row group (there is a minimum of 5)
:return:
"""
spark_context = spark.sparkContext
with materialize_dataset(spark, output_dataset_url, _ShuffleAnalysisSchema):
rows_rdd = spark_context.parallelize(range(num_rows), numSlices=50) \
.map(lambda i: {'id': i}) \
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from pyspark.sql.types import StringType
from petastorm.codecs import ScalarCodec, CompressedImageCodec
from petastorm.unischema import Unischema, UnischemaField
ImagenetSchema = Unischema('ImagenetSchema', [
UnischemaField('noun_id', np.string_, (), ScalarCodec(StringType()), False),
UnischemaField('text', np.string_, (), ScalarCodec(StringType()), False),
UnischemaField('image', np.uint8, (None, None, 3), CompressedImageCodec('png'), False),
])
"""
Convert an apache arrow schema into a unischema object. This is useful for datasets of only scalars
which need no special encoding/decoding. If there is an unsupported type in the arrow schema, it will
throw an exception.
When the warn_only parameter is turned to True, unsupported column types prints only warnings.
:param arrow_schema: :class:`pyarrow.lib.Schema`
:param omit_unsupported_fields: :class:`Boolean`
:return: A :class:`Unischema` object.
"""
meta = parquet_dataset.pieces[0].get_metadata(parquet_dataset.fs.open)
arrow_schema = meta.schema.to_arrow_schema()
unischema_fields = []
for partition_name in parquet_dataset.partitions.partition_names:
unischema_fields.append(UnischemaField(partition_name, np.str_, (), ScalarCodec(StringType()), False))
for column_name in arrow_schema.names:
arrow_field = arrow_schema.field_by_name(column_name)
field_type = arrow_field.type
if isinstance(field_type, ListType):
if isinstance(field_type.value_type, ListType) or isinstance(field_type.value_type, pyStructType):
warnings.warn('[ARROW-1644] Ignoring unsupported structure %r for field %r'
% (field_type, column_name))
continue
try:
codec, np_type = _numpy_and_codec_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
% (column_name, field_type))
continue
field_type = arrow_field.type
if isinstance(field_type, ListType):
if isinstance(field_type.value_type, ListType) or isinstance(field_type.value_type, pyStructType):
warnings.warn('[ARROW-1644] Ignoring unsupported structure %r for field %r'
% (field_type, column_name))
continue
try:
codec, np_type = _numpy_and_codec_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
% (column_name, field_type))
continue
else:
raise
unischema_fields.append(UnischemaField(column_name, np_type, (), codec, arrow_field.nullable))
return Unischema('inferred_schema', unischema_fields)
field_type = arrow_field.type
if isinstance(field_type, ListType):
if isinstance(field_type.value_type, ListType) or isinstance(field_type.value_type, pyStructType):
warnings.warn('[ARROW-1644] Ignoring unsupported structure %r for field %r'
% (field_type, column_name))
continue
try:
np_type = _numpy_and_codec_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
% (column_name, field_type))
continue
else:
raise
unischema_fields.append(UnischemaField(column_name, np_type, (), None, arrow_field.nullable))
return Unischema('inferred_schema', unischema_fields)
sample dataset with some random data.
"""
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
Instead, compare all field attributes, but only codec type.
Future: Give codec a mime identifier.
"""
return _fields_as_tuple(self) == _fields_as_tuple(other)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(_fields_as_tuple(self))
# Defines default arguments for UnischemaField namedtuple:
# Makes the signature equivalent to UnischemaField(name, numpy_dtype, shape, codec=None, nullable=False)
UnischemaField.__new__.__defaults__ = (None, False)
class _NamedtupleCache(object):
"""_NamedtupleCache makes sure the same instance of a namedtuple is returned for a given schema and a set of
fields. This makes comparison between types possible. For example, `tf.data.Dataset.concatenate` implementation
compares types to make sure two datasets can be concatenated."""
_store = dict()
@staticmethod
def get(parent_schema_name, field_names):
"""Creates a nametuple with field_names as values. Returns an existing instance if was already created.
:param parent_schema_name: Schema name becomes is part of the cache key
:param field_names: defines names of the fields in the namedtuple created/returned. Also part of the cache key.
:return: A namedtuple with field names defined by `field_names`
"""
This is a minimal example of how to generate a petastorm dataset. Generates a
sample dataset with some random data.
"""
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
:param schema: A pre-transform schema
:param transform_spec: a TransformSpec object with mutation instructions.
:return: A post-transform schema
"""
removed_fields = set(transform_spec.removed_fields)
unknown_field_names = removed_fields - set(schema.fields.keys())
if unknown_field_names:
warnings.warn('remove_fields specified some field names that are not part of the schema. '
'These field names will be ignored "{}". '.format(', '.join(unknown_field_names)))
exclude_fields = {f[0] for f in transform_spec.edit_fields} | removed_fields
fields = [v for k, v in schema.fields.items() if k not in exclude_fields]
for field_to_edit in transform_spec.edit_fields:
edited_unischema_field = UnischemaField(name=field_to_edit[0], numpy_dtype=field_to_edit[1],
shape=field_to_edit[2], codec=None, nullable=field_to_edit[3])
fields.append(edited_unischema_field)
return Unischema(schema._name + '_transformed', fields)
"""
This is a minimal example of how to generate a petastorm dataset. Generates a
sample dataset with some random data.
"""
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()