Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _map_fun(args, ctx):
import tensorflow as tf
tf_feed = TFNode.DataFeed(ctx.mgr, False)
while not tf_feed.should_stop():
batch = tf_feed.next_batch(10)
if len(batch) > 0:
squares = tf.math.square(batch)
tf_feed.batch_results(squares.numpy())
# simulate post-feed actions that raise an exception
time.sleep(2)
raise Exception("FAKE exception after feeding")
def _map_fun(args, ctx):
import tensorflow as tf
tf_feed = TFNode.DataFeed(ctx.mgr, False)
while not tf_feed.should_stop():
batch = tf_feed.next_batch(10)
if len(batch) > 0:
squares = tf.math.square(batch)
tf_feed.batch_results(squares.numpy())
raise Exception("FAKE exception during feeding")
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(
loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
# single node
# single_worker_model = build_and_compile_cnn_model()
# single_worker_model.fit(x=train_datasets, epochs=3)
tf_feed = TFNode.DataFeed(ctx.mgr, False)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) > 0:
example = batch[0]
image = np.array(example[0]).astype(np.float32) / 255.0
image = np.reshape(image, (28, 28, 1))
label = np.array(example[1]).astype(np.float32)
label = np.reshape(label, (1,))
yield (image, label)
else:
return
ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([28, 28, 1]), tf.TensorShape([1])))
ds = ds.batch(args.batch_size)
# convert Keras model to tf.estimator
estimator = tf.keras.estimator.model_to_estimator(model, model_dir=args.model_dir)
# setup train_input_fn for InputMode.TENSORFLOW or InputMode.SPARK
if args.mode == 'train':
if args.input_mode == 'tf':
# For InputMode.TENSORFLOW, just use data in memory
train_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"dense_input": x_train},
y=y_train,
batch_size=128,
num_epochs=None,
shuffle=True)
else: # 'spark'
# For InputMode.SPARK, read data from RDD
tf_feed = TFNode.DataFeed(ctx.mgr)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) > 0:
record = batch[0]
image = numpy.array(record[0]).astype(numpy.float32) / 255.0
label = numpy.array(record[1]).astype(numpy.float32)
yield (image, label)
def train_input_fn():
ds = tf.data.Dataset.from_generator(rdd_generator,
(tf.float32, tf.float32),
(tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10])))
ds = ds.batch(args.batch_size)
return ds
def get_data_feed(self, train_mode=True, qname_in='input', qname_out='output', input_mapping=None):
"""Convenience function to access ``TFNode.DataFeed`` directly from this object instance."""
return TFNode.DataFeed(self.mgr, train_mode, qname_in, qname_out, input_mapping)
import numpy
import tensorflow as tf
worker_num = ctx.worker_num
job_name = ctx.job_name
task_index = ctx.task_index
# Parameters
IMAGE_PIXELS = 28
hidden_units = 128
# Get TF cluster and server instances
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
# Create generator for Spark data feed
tf_feed = TFNode.DataFeed(ctx.mgr, args.mode == "train")
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)[0]
image = numpy.array(batch[0])
image = image.astype(numpy.float32) / 255.0
label = numpy.array(batch[1])
label = label.astype(numpy.int64)
yield (image, label)
if job_name == "ps":
server.join()
elif job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
class StopFeedHook(tf.estimator.SessionRunHook):
"""SessionRunHook to terminate InputMode.SPARK RDD feeding if the training loop exits before the entire RDD is consumed."""
def __init__(self, feed):
self.feed = feed
def end(self, session):
self.feed.terminate()
self.feed.next_batch(1)
BUFFER_SIZE = args.buffer_size
BATCH_SIZE = args.batch_size
LEARNING_RATE = args.learning_rate
tf_feed = TFNode.DataFeed(ctx.mgr)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) > 0:
example = batch[0]
image = np.array(example[0]).astype(np.float32) / 255.0
image = np.reshape(image, (28, 28, 1))
label = np.array(example[1]).astype(np.float32)
label = np.reshape(label, (1,))
yield (image, label)
else:
return
def input_fn(mode, input_context=None):
if mode == tf.estimator.ModeKeys.TRAIN:
# eval_input_fn ALWAYS uses data loaded in memory, since InputMode.SPARK can only feed one RDD at a time
eval_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"dense_input": x_test},
y=y_test,
num_epochs=args.epochs,
shuffle=False)
# setup tf.estimator.train_and_evaluate() w/ FinalExporter
feature_spec = {'dense_input': tf.FixedLenFeature(784, tf.float32)}
exporter = tf.estimator.FinalExporter("serving", serving_input_receiver_fn=tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec))
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=args.steps)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, exporters=exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
else: # mode == 'inference'
if args.input_mode == 'spark':
tf_feed = TFNode.DataFeed(ctx.mgr)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) > 0:
record = batch[0]
image = numpy.array(record[0]).astype(numpy.float32) / 255.0
label = numpy.array(record[1]).astype(numpy.float32)
yield (image, label)
def predict_input_fn():
ds = tf.data.Dataset.from_generator(rdd_generator,
(tf.float32, tf.float32),
(tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10])))
ds = ds.batch(args.batch_size)
return ds