Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Normalization of absolution & relative string paths depending on filesystem"""
cwd = os.getcwd()
user = getpass.getuser()
fs = ["file://", "hdfs://", "viewfs://"]
paths = {
"hdfs://foo/bar": ["hdfs://foo/bar", "hdfs://foo/bar", "hdfs://foo/bar"],
"viewfs://foo/bar": ["viewfs://foo/bar", "viewfs://foo/bar", "viewfs://foo/bar"],
"file://foo/bar": ["file://foo/bar", "file://foo/bar", "file://foo/bar"],
"/foo/bar": ["file:///foo/bar", "hdfs:///foo/bar", "viewfs:///foo/bar"],
"foo/bar": ["file://{}/foo/bar".format(cwd), "hdfs:///user/{}/foo/bar".format(user), "viewfs:///user/{}/foo/bar".format(user)],
}
for i in range(len(fs)):
ctx = type('MockContext', (), {'defaultFS': fs[i], 'working_dir': cwd})
for path, expected in paths.items():
final_path = TFNode.hdfs_path(ctx, path)
self.assertEqual(final_path, expected[i], "fs({}) + path({}) => {}, expected {}".format(fs[i], path, final_path, expected[i]))
def _spark_train(args, ctx):
"""Basic linear regression in a distributed TF cluster using InputMode.SPARK"""
import tensorflow as tf
from tensorflowonspark import TFNode
tf.compat.v1.reset_default_graph()
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = Sequential()
model.add(Dense(1, activation='linear', input_shape=[2]))
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.2), loss='mse', metrics=['mse'])
model.summary()
tf_feed = TFNode.DataFeed(ctx.mgr, input_mapping=args.input_mapping)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch['x']) > 0:
features = batch['x'][0]
label = batch['y_'][0]
yield (features, label)
else:
return
ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([2]), tf.TensorShape([1])))
# disable auto-sharding since we're feeding from an RDD generator
options = tf.data.Options()
compat.disable_auto_shard(options)
ds = ds.with_options(options)
tb_callback = TensorBoard(log_dir=args.model_dir, histogram_freq=1, write_graph=True, write_images=True)
# add callbacks to save model checkpoint and tensorboard events (on worker:0 only)
callbacks = [ckpt_callback, tb_callback] if ctx.task_index == 0 else None
if args.input_mode == 'tf':
# train & validate on in-memory data
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=args.epochs,
verbose=1,
validation_data=(x_test, y_test),
callbacks=callbacks)
else: # args.input_mode == 'spark':
# train on data read from a generator which is producing data from a Spark RDD
tf_feed = TFNode.DataFeed(ctx.mgr)
model.fit_generator(generator=generate_rdd_data(tf_feed, batch_size),
steps_per_epoch=args.steps_per_epoch,
epochs=args.epochs,
verbose=1,
validation_data=(x_test, y_test),
callbacks=callbacks)
if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
# save a local Keras model, so we can reload it with an inferencing learning_phase
save_model(model, "tmp_model")
# reload the model
K.set_learning_phase(False)
new_model = load_model("tmp_model")
# export a saved_model for inferencing
def main_fun(args, ctx):
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflowonspark import TFNode
tfds.disable_progress_bar()
BUFFER_SIZE = args.buffer_size
BATCH_SIZE = args.batch_size
LEARNING_RATE = args.learning_rate
tf_feed = TFNode.DataFeed(ctx.mgr)
def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) > 0:
example = batch[0]
image = np.array(example[0]).astype(np.float32) / 255.0
image = np.reshape(image, (28, 28, 1))
label = np.array(example[1]).astype(np.float32)
label = np.reshape(label, (1,))
yield (image, label)
else:
return
def input_fn(mode, input_context=None):
if mode == tf.estimator.ModeKeys.TRAIN:
worker_num = ctx.worker_num
job_name = ctx.job_name
task_index = ctx.task_index
IMAGE_PIXELS = 28
# Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
if job_name == "ps":
time.sleep((worker_num + 1) * 5)
# Parameters
hidden_units = 128
batch_size = args.batch_size
# Get TF cluster and server instances
cluster, server = TFNode.start_cluster_server(ctx, 1, args.protocol == 'rdma')
def feed_dict(batch):
# Convert from dict of named arrays to two numpy arrays of the proper type
images = batch['image']
labels = batch['label']
xs = numpy.array(images)
xs = xs.astype(numpy.float32)
xs = xs / 255.0
ys = numpy.array(labels)
ys = ys.astype(numpy.uint8)
return (xs, ys)
if job_name == "ps":
server.join()
elif job_name == "worker":