Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
shuffle_row_drop_partitions,
num_corr_samples=100):
"""
Compute the correlation distribution of a given shuffle_options on an existing dataset.
Use this to compare 2 different shuffling options compare.
It is encouraged to use a dataset generated by generate_shuffle_analysis_dataset for this analysis.
:param dataset_url: Dataset url to compute correlation distribution of
:param id_column: Column where an integer or string id can be found
:param shuffle_row_drop_partitions: shuffle_row_drop_partitions to test correlation against
:param num_corr_samples: How many samples of the correlation to take to compute distribution
:return: (mean, standard deviation) of computed distribution
"""
# Read the dataset without any shuffling in order (need to use a dummy pool for this).
with make_reader(dataset_url,
shuffle_row_groups=False,
reader_pool_type='dummy') as reader:
unshuffled = [row[id_column] for row in reader]
correlations = []
for _ in range(num_corr_samples):
with make_reader(dataset_url,
shuffle_row_groups=True,
shuffle_row_drop_partitions=shuffle_row_drop_partitions) as reader:
shuffled = [row[id_column] for row in reader]
correlations.append(abs(np.corrcoef(unshuffled, shuffled)[0, 1]))
mean = np.mean(correlations)
std_dev = np.std(correlations)
return mean, std_dev
:param dataset_url: Dataset url to compute correlation distribution of
:param id_column: Column where an integer or string id can be found
:param shuffle_row_drop_partitions: shuffle_row_drop_partitions to test correlation against
:param num_corr_samples: How many samples of the correlation to take to compute distribution
:return: (mean, standard deviation) of computed distribution
"""
# Read the dataset without any shuffling in order (need to use a dummy pool for this).
with make_reader(dataset_url,
shuffle_row_groups=False,
reader_pool_type='dummy') as reader:
unshuffled = [row[id_column] for row in reader]
correlations = []
for _ in range(num_corr_samples):
with make_reader(dataset_url,
shuffle_row_groups=True,
shuffle_row_drop_partitions=shuffle_row_drop_partitions) as reader:
shuffled = [row[id_column] for row in reader]
correlations.append(abs(np.corrcoef(unshuffled, shuffled)[0, 1]))
mean = np.mean(correlations)
std_dev = np.std(correlations)
return mean, std_dev
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from functools import partial
import numpy as np
from pyspark.sql.types import LongType
from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
_ShuffleAnalysisSchema = Unischema('_ShuffleAnalysisSchema',
[UnischemaField('id', np.int64, (), ScalarCodec(LongType()), False)])
def generate_shuffle_analysis_dataset(spark, output_dataset_url, num_rows=1000, row_group_size=100):
"""
Generates a small dataset useful for doing analysis on shuffling algorithms
:param spark: spark session
:param output_dataset_url: location to write dataset
:param num_rows: how many rows should the dataset include
:param row_group_size: how many rows in each row group (there is a minimum of 5)
:return:
"""
spark_context = spark.sparkContext
with materialize_dataset(spark, output_dataset_url, _ShuffleAnalysisSchema):
rows_rdd = spark_context.parallelize(range(num_rows), numSlices=50) \
.map(lambda i: {'id': i}) \
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from functools import partial
import numpy as np
from pyspark.sql.types import LongType
from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
_ShuffleAnalysisSchema = Unischema('_ShuffleAnalysisSchema',
[UnischemaField('id', np.int64, (), ScalarCodec(LongType()), False)])
def generate_shuffle_analysis_dataset(spark, output_dataset_url, num_rows=1000, row_group_size=100):
"""
Generates a small dataset useful for doing analysis on shuffling algorithms
:param spark: spark session
:param output_dataset_url: location to write dataset
:param num_rows: how many rows should the dataset include
:param row_group_size: how many rows in each row group (there is a minimum of 5)
:return:
"""
spark_context = spark.sparkContext
with materialize_dataset(spark, output_dataset_url, _ShuffleAnalysisSchema):
rows_rdd = spark_context.parallelize(range(num_rows), numSlices=50) \
# Typical training usage would use the `all_epochs` approach.
#
if args.all_epochs:
# Run training across all the epochs before testing for accuracy
loop_epochs = 1
reader_epochs = args.epochs
else:
# Test training accuracy after each epoch
loop_epochs = args.epochs
reader_epochs = 1
transform = TransformSpec(_transform_row, removed_fields=['idx'])
# Instantiate each petastorm Reader with a single thread, shuffle enabled, and appropriate epoch setting
for epoch in range(1, loop_epochs + 1):
with DataLoader(make_reader('{}/train'.format(args.dataset_url), num_epochs=reader_epochs,
transform_spec=transform),
batch_size=args.batch_size) as train_loader:
train(model, device, train_loader, args.log_interval, optimizer, epoch)
with DataLoader(make_reader('{}/test'.format(args.dataset_url), num_epochs=reader_epochs,
transform_spec=transform),
batch_size=args.test_batch_size) as test_loader:
test(model, device, test_loader)
def train_and_test(dataset_url, training_iterations, batch_size, evaluation_interval):
"""
Train a model for training iterations with a batch size batch_size, printing accuracy every log_interval.
:param dataset_url: The MNIST dataset url.
:param training_iterations: The training iterations to train for.
:param batch_size: The batch size for training.
:param evaluation_interval: The interval used to print the accuracy.
:return:
"""
with make_reader(os.path.join(dataset_url, 'train'), num_epochs=None) as train_reader:
with make_reader(os.path.join(dataset_url, 'test'), num_epochs=None) as test_reader:
train_readout = tf_tensors(train_reader)
train_image = tf.cast(tf.reshape(train_readout.image, [784]), tf.float32)
train_label = train_readout.digit
batch_image, batch_label = tf.train.batch(
[train_image, train_label], batch_size=batch_size
)
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(batch_image, W) + b
# The raw formulation of cross-entropy,
#
# tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.nn.softmax(y)),
# reduction_indices=[1]))
def train_and_test(dataset_url, training_iterations, batch_size, evaluation_interval):
"""
Train a model for training iterations with a batch size batch_size, printing accuracy every log_interval.
:param dataset_url: The MNIST dataset url.
:param training_iterations: The training iterations to train for.
:param batch_size: The batch size for training.
:param evaluation_interval: The interval used to print the accuracy.
:return:
"""
with make_reader(os.path.join(dataset_url, 'train'), num_epochs=None) as train_reader:
with make_reader(os.path.join(dataset_url, 'test'), num_epochs=None) as test_reader:
train_readout = tf_tensors(train_reader)
train_image = tf.cast(tf.reshape(train_readout.image, [784]), tf.float32)
train_label = train_readout.digit
batch_image, batch_label = tf.train.batch(
[train_image, train_label], batch_size=batch_size
)
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(batch_image, W) + b
# The raw formulation of cross-entropy,
#
# tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.nn.softmax(y)),
# reduction_indices=[1]))
#
def train_and_test(dataset_url, training_iterations, batch_size, evaluation_interval):
"""
Train a model for training iterations with a batch size batch_size, printing accuracy every log_interval.
:param dataset_url: The MNIST dataset url.
:param training_iterations: The training iterations to train for.
:param batch_size: The batch size for training.
:param evaluation_interval: The interval used to print the accuracy.
:return:
"""
with make_reader(os.path.join(dataset_url, 'train'), num_epochs=None) as train_reader:
with make_reader(os.path.join(dataset_url, 'test'), num_epochs=None) as test_reader:
train_readout = tf_tensors(train_reader)
train_image = tf.cast(tf.reshape(train_readout.image, [784]), tf.float32)
train_label = train_readout.digit
batch_image, batch_label = tf.train.batch(
[train_image, train_label], batch_size=batch_size
)
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(batch_image, W) + b
# The raw formulation of cross-entropy,
#
# tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.nn.softmax(y)),
# reduction_indices=[1]))
#
# can be numerically unstable.
the following fields: `time_mean`, `samples_per_second`, `memory_info` and `cpu`
"""
if not reader_extra_args:
reader_extra_args = dict()
if spawn_new_process:
args = copy.deepcopy(locals())
args['spawn_new_process'] = False
executor = ProcessPoolExecutor(1)
future = executor.submit(reader_throughput, **args)
return future.result()
logger.info('Arguments: %s', locals())
if 'schema_fields' not in reader_extra_args:
unischema_fields = match_unischema_fields(get_schema_from_dataset_url(dataset_url), field_regex)
reader_extra_args['schema_fields'] = unischema_fields
logger.info('Fields used in the benchmark: %s', str(reader_extra_args['schema_fields']))
with make_reader(dataset_url,
num_epochs=None,
reader_pool_type=str(pool_type), workers_count=loaders_count, pyarrow_serialize=pyarrow_serialize,
**reader_extra_args) as reader:
if read_method == ReadMethod.PYTHON:
result = _time_warmup_and_work(reader, warmup_cycles_count, measure_cycles_count)
elif read_method == ReadMethod.TF:
result = _time_warmup_and_work_tf(reader, warmup_cycles_count, measure_cycles_count,
shuffling_queue_size, min_after_dequeue)
else:
raise RuntimeError('Unexpected reader_type value: %s', str(read_method))
raise ValueError('Predicate is nor derived from PredicateBase')
self._predicate_list = predicate_list
self._reduce_func = reduce_func
def get_fields(self):
fields = set()
for p in self._predicate_list:
fields |= p.get_fields()
return fields
def do_include(self, values):
include_list = [p.do_include(values) for p in self._predicate_list]
return self._reduce_func(include_list)
class in_pseudorandom_split(PredicateBase):
""" Split dataset according to a split list based on volume_guid.
The split is pseudorandom (can not supply the seed yet), i.e. the split outcome is always the same.
Split is performed by hashing volume_guid uniformly to 0:1 range and returning part of full dataset
which was hashed in given sub-range
Example:
'split_list = [0.5, 0.2, 0.3]' - dataset will be split on three subsets in proportion
subset 1: 0.5 of log data
subset 2: 0.2 of log data
subset 3: 0.3 of log data
Note, split is not exact, so avoid small fraction (e.g. 0.001) to avoid empty sets
"""
def __init__(self, fraction_list, subset_index, predicate_field):
""" split_list: a list of log fractions (real numbers in range [0:1])
subset_index: define which subset will be used by the Reader