Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_inputmode_spark(self):
"""Distributed TF cluster w/ InputMode.SPARK"""
def _map_fun(args, ctx):
import tensorflow as tf
tf_feed = TFNode.DataFeed(ctx.mgr, False)
while not tf_feed.should_stop():
batch = tf_feed.next_batch(batch_size=10)
print("batch: {}".format(batch))
squares = tf.math.square(batch)
print("squares: {}".format(squares))
tf_feed.batch_results(squares.numpy())
input = [[x] for x in range(1000)] # set up input as tensors of shape [1] to match placeholder
rdd = self.sc.parallelize(input, 10)
cluster = TFCluster.run(self.sc, _map_fun, tf_args={}, num_executors=self.num_workers, num_ps=0, input_mode=TFCluster.InputMode.SPARK)
rdd_out = cluster.inference(rdd)
rdd_sum = rdd_out.sum()
self.assertEqual(rdd_sum, sum([x * x for x in range(1000)]))
cluster.shutdown()
def toNumpy(bytestr):
example = tf.train.Example()
example.ParseFromString(bytestr)
features = example.features.feature
image = numpy.array(features['image'].int64_list.value)
label = numpy.array(features['label'].int64_list.value)
return (image, label)
dataRDD = images.map(lambda x: toNumpy(bytes(x[0])))
else: # args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
print("zipping images and labels")
dataRDD = images.zip(labels)
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
if args.mode == "train":
cluster.train(dataRDD, args.epochs)
else:
labelRDD = cluster.inference(dataRDD)
labelRDD.saveAsTextFile(args.output)
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))
print("{0} ===== Start".format(datetime.now().isoformat()))
def parse(ln):
lbl, img = ln.split('|')
image = [int(x) for x in img.split(',')]
label = numpy.zeros(10)
label[int(lbl)] = 1.0
return (image, label)
stream = ssc.textFileStream(args.images)
imageRDD = stream.map(lambda ln: parse(ln))
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
if args.mode == "train":
cluster.train(imageRDD)
else:
labelRDD = cluster.inference(imageRDD)
labelRDD.saveAsTextFiles(args.output)
ssc.start()
cluster.shutdown(ssc)
print("{0} ===== Stop".format(datetime.now().isoformat()))
def toNumpy(bytestr):
example = tf.train.Example()
example.ParseFromString(bytestr)
features = example.features.feature
image = numpy.array(features['image'].int64_list.value)
label = numpy.array(features['label'].int64_list.value)
return (image, label)
dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else: # args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
print("zipping images and labels")
dataRDD = images.zip(labels)
cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
if args.mode == "train":
cluster.train(dataRDD, args.epochs)
else:
labelRDD = cluster.inference(dataRDD)
labelRDD.saveAsTextFile(args.output)
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
if args.input_mode == 'tf':
# for TENSORFLOW mode, each node will load/train/infer entire dataset in memory per original example
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir, master_node='master')
cluster.shutdown()
else: # 'spark'
# for SPARK mode, just use CSV format as an example
images = sc.textFile(args.images).map(lambda ln: [float(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
if args.mode == 'train':
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir, master_node='master')
cluster.train(dataRDD, args.epochs)
cluster.shutdown()
else:
# Note: using "parallel" inferencing, not "cluster"
# each node loads the model and runs independently of others
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, 0, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir)
resultRDD = cluster.inference(dataRDD)
resultRDD.saveAsTextFile(args.output)
cluster.shutdown()
parser.add_argument("--images_labels", help="path to MNIST images and labels in parallelized format")
parser.add_argument("--model_dir", help="path to save checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
images_labels = sc.textFile(args.images_labels).map(parse)
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=0, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.SPARK, master_node='chief')
# Note: need to feed extra data to ensure that each worker receives sufficient data to complete epochs
# to compensate for variability in partition sizes and spark scheduling
cluster.train(images_labels, args.epochs)
cluster.shutdown()
parser.add_argument("--learning_rate", help="learning rate", type=float, default=1e-3)
parser.add_argument("--model_dir", help="path to save checkpoint", default="mnist_model")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
stream = ssc.textFileStream(args.images_labels)
images_labels = stream.map(parse)
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=1, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.SPARK, log_dir=args.model_dir, master_node='chief')
cluster.train(images_labels, feed_timeout=86400) # extend feed timeout to 24hrs for streaming data to arrive
ssc.start()
cluster.shutdown(ssc)
Args:
:dataset: A Spark DataFrame with columns that will be mapped to TensorFlow tensors.
Returns:
A TFModel representing the trained model, backed on disk by a TensorFlow checkpoint or saved_model.
"""
sc = SparkContext.getOrCreate()
logger.info("===== 1. train args: {0}".format(self.args))
logger.info("===== 2. train params: {0}".format(self._paramMap))
local_args = self.merge_args_params()
logger.info("===== 3. train args + params: {0}".format(local_args))
tf_args = self.args.argv if self.args.argv else local_args
cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps,
local_args.tensorboard, TFCluster.InputMode.SPARK, master_node=local_args.master_node, driver_ps_nodes=local_args.driver_ps_nodes)
# feed data, using a deterministic order for input columns (lexicographic by key)
input_cols = sorted(self.getInputMapping())
cluster.train(dataset.select(input_cols).rdd, local_args.epochs)
cluster.shutdown(grace_secs=self.getGraceSecs())
return self._copyValues(TFModel(self.args))
def _start_ps(node_index):
logger.info("starting ps node locally %d" % node_index)
TFSparkNode.run(map_fun,
tf_args,
cluster_meta,
tensorboard,
log_dir,
queues,
background=(input_mode == InputMode.SPARK))([node_index])
for i in cluster_template['ps']:
parser.add_argument("--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("--input_data", help="HDFS path to input dataset")
parser.add_argument("--input_mode", help="method to ingest data: (spark|tf)", choices=["spark","tf"], default="tf")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
(args,rem) = parser.parse_known_args()
input_mode = TFCluster.InputMode.SPARK if args.input_mode == 'spark' else TFCluster.InputMode.TENSORFLOW
print("{0} ===== Start".format(datetime.now().isoformat()))
sc = SparkContext(conf=SparkConf().setAppName('imagenet_distributed_train'))
num_executors = int(sc._conf.get("spark.executor.instances"))
num_ps = 1
cluster = TFCluster.run(sc, main_fun, sys.argv, num_executors, num_ps, args.tensorboard, input_mode)
if input_mode == TFCluster.InputMode.SPARK:
dataRDD = sc.newAPIHadoopFile(args.input_data, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
cluster.train(dataRDD, args.epochs)
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))