Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
shuffle_row_drop_partitions,
num_corr_samples=100):
"""
Compute the correlation distribution of a given shuffle_options on an existing dataset.
Use this to compare 2 different shuffling options compare.
It is encouraged to use a dataset generated by generate_shuffle_analysis_dataset for this analysis.
:param dataset_url: Dataset url to compute correlation distribution of
:param id_column: Column where an integer or string id can be found
:param shuffle_row_drop_partitions: shuffle_row_drop_partitions to test correlation against
:param num_corr_samples: How many samples of the correlation to take to compute distribution
:return: (mean, standard deviation) of computed distribution
"""
# Read the dataset without any shuffling in order (need to use a dummy pool for this).
with make_reader(dataset_url,
shuffle_row_groups=False,
reader_pool_type='dummy') as reader:
unshuffled = [row[id_column] for row in reader]
correlations = []
for _ in range(num_corr_samples):
with make_reader(dataset_url,
shuffle_row_groups=True,
shuffle_row_drop_partitions=shuffle_row_drop_partitions) as reader:
shuffled = [row[id_column] for row in reader]
correlations.append(abs(np.corrcoef(unshuffled, shuffled)[0, 1]))
mean = np.mean(correlations)
std_dev = np.std(correlations)
return mean, std_dev
:param dataset_url: Dataset url to compute correlation distribution of
:param id_column: Column where an integer or string id can be found
:param shuffle_row_drop_partitions: shuffle_row_drop_partitions to test correlation against
:param num_corr_samples: How many samples of the correlation to take to compute distribution
:return: (mean, standard deviation) of computed distribution
"""
# Read the dataset without any shuffling in order (need to use a dummy pool for this).
with make_reader(dataset_url,
shuffle_row_groups=False,
reader_pool_type='dummy') as reader:
unshuffled = [row[id_column] for row in reader]
correlations = []
for _ in range(num_corr_samples):
with make_reader(dataset_url,
shuffle_row_groups=True,
shuffle_row_drop_partitions=shuffle_row_drop_partitions) as reader:
shuffled = [row[id_column] for row in reader]
correlations.append(abs(np.corrcoef(unshuffled, shuffled)[0, 1]))
mean = np.mean(correlations)
std_dev = np.std(correlations)
return mean, std_dev
def train_and_test(dataset_url, training_iterations, batch_size, evaluation_interval):
"""
Train a model for training iterations with a batch size batch_size, printing accuracy every log_interval.
:param dataset_url: The MNIST dataset url.
:param training_iterations: The training iterations to train for.
:param batch_size: The batch size for training.
:param evaluation_interval: The interval used to print the accuracy.
:return:
"""
with make_reader(os.path.join(dataset_url, 'train'), num_epochs=None) as train_reader:
with make_reader(os.path.join(dataset_url, 'test'), num_epochs=None) as test_reader:
train_readout = tf_tensors(train_reader)
train_image = tf.cast(tf.reshape(train_readout.image, [784]), tf.float32)
train_label = train_readout.digit
batch_image, batch_label = tf.train.batch(
[train_image, train_label], batch_size=batch_size
)
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(batch_image, W) + b
# The raw formulation of cross-entropy,
#
# tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.nn.softmax(y)),
# reduction_indices=[1]))
def train_and_test(dataset_url, training_iterations, batch_size, evaluation_interval):
"""
Train a model for training iterations with a batch size batch_size, printing accuracy every log_interval.
:param dataset_url: The MNIST dataset url.
:param training_iterations: The training iterations to train for.
:param batch_size: The batch size for training.
:param evaluation_interval: The interval used to print the accuracy.
:return:
"""
with make_reader(os.path.join(dataset_url, 'train'), num_epochs=None) as train_reader:
with make_reader(os.path.join(dataset_url, 'test'), num_epochs=None) as test_reader:
train_readout = tf_tensors(train_reader)
train_image = tf.cast(tf.reshape(train_readout.image, [784]), tf.float32)
train_label = train_readout.digit
batch_image, batch_label = tf.train.batch(
[train_image, train_label], batch_size=batch_size
)
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.matmul(batch_image, W) + b
# The raw formulation of cross-entropy,
#
# tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.nn.softmax(y)),
# reduction_indices=[1]))
#
def tensorflow_hello_world(dataset_url='file:///tmp/hello_world_dataset'):
# Example: tf_tensors will return tensors with dataset data
with make_reader(dataset_url) as reader:
tensor = tf_tensors(reader)
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
# Example: use tf.data.Dataset API
with make_reader(dataset_url) as reader:
dataset = make_petastorm_dataset(reader)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
def tensorflow_hello_world(dataset_url='file:///tmp/hello_world_dataset'):
# Example: tf_tensors will return tensors with dataset data
with make_reader(dataset_url) as reader:
tensor = tf_tensors(reader)
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
# Example: use tf.data.Dataset API
with make_reader(dataset_url) as reader:
dataset = make_petastorm_dataset(reader)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
def __init__(self, input_features, output_features, data_parquet_fp):
self.reader = make_reader(data_parquet_fp)
self.size = self.get_size()
self.data_parquet_fp = data_parquet_fp
self.input_features = {}
for feature in input_features:
feature_name = feature['name']
self.input_features[feature_name] = feature
self.output_features = {}
for feature in output_features:
feature_name = feature['name']
self.output_features[feature_name] = feature
self.features = self.input_features.copy()