Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized CSV format")
parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
parser.add_argument("--model_dir", help="directory to write model checkpoints")
parser.add_argument("--mode", help="(train|inference)", default="train")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
parser.add_argument("--steps", help="max number of steps to train", type=int, default=2000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
if args.input_mode == 'tf':
# for TENSORFLOW mode, each node will load/train/infer entire dataset in memory per original example
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir, master_node='master')
cluster.shutdown()
else: # 'spark'
# for SPARK mode, just use CSV format as an example
images = sc.textFile(args.images).map(lambda ln: [float(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
if args.mode == 'train':
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir, master_node='master')
cluster.train(dataRDD, args.epochs)
cluster.shutdown()
else:
# Note: using "parallel" inferencing, not "cluster"
# each node loads the model and runs independently of others
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, 0, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir)
resultRDD = cluster.inference(dataRDD)
resultRDD.saveAsTextFile(args.output)
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, args, export_fn=mnist_dist_pipeline.export_fun) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setDriverPSNodes(args.driver_ps_nodes) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTFRecordDir(args.tfrecord_dir) \
.setProtocol(args.protocol) \
.setReaders(args.readers) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
# NO INFERENCING
if args.inference_mode == 'none':
self.sc.cancelAllJobs()
self.sc.stop()
sys.exit(1)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
# wait for Spark Streaming termination or TF app completion for InputMode.TENSORFLOW
if ssc is not None:
# Spark Streaming
while not ssc.awaitTerminationOrTimeout(1):
if self.server.done:
logger.info("Server done, stopping StreamingContext")
ssc.stop(stopSparkContext=False, stopGraceFully=True)
break
elif self.input_mode == InputMode.TENSORFLOW:
# in TENSORFLOW mode, there is no "data feeding" job, only a "start" job, so we must wait for the TensorFlow workers
# to complete all tasks, while accounting for any PS tasks which run indefinitely.
count = 0
while count < 3:
st = self.sc.statusTracker()
jobs = st.getActiveJobsIds()
if len(jobs) == 0:
break
stages = st.getActiveStageIds()
for i in stages:
si = st.getStageInfo(i)
if si.numActiveTasks == len(ps_list) + len(eval_list):
# if we only have PS tasks left, check that we see this condition a couple times
count += 1
time.sleep(5)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--epochs", help="number of epochs of training data", type=int, default=20)
parser.add_argument("--export_dir", help="directory to export saved_model")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized CSV format")
parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
parser.add_argument("--model_dir", help="directory to write model checkpoints")
parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=300)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
if args.input_mode == 'tf':
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir)
else: # args.input_mode == 'spark':
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model_dir)
images = sc.textFile(args.images).map(lambda ln: [float(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
cluster.train(dataRDD, args.epochs)
cluster.shutdown()
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=64)
parser.add_argument("--buffer_size", help="size of shuffle buffer", type=int, default=10000)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--data_format", help="data format (tfos|tfds)", type=str, choices=["tfos", "tfds"], default="tfos")
parser.add_argument("--epochs", help="number of epochs", type=int, default=3)
parser.add_argument("--images_labels", help="HDFS path to MNIST image_label files in parallelized format")
parser.add_argument("--model_dir", help="path to save model/checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=0, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief')
cluster.shutdown()
if not tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.MakeDirs(FLAGS.train_dir)
inception_distributed_train.train(server.target, dataset, cluster_spec, ctx)
if __name__ == '__main__':
# parse arguments needed by the Spark driver
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("--input_data", help="HDFS path to input dataset")
parser.add_argument("--input_mode", help="method to ingest data: (spark|tf)", choices=["spark","tf"], default="tf")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
(args,rem) = parser.parse_known_args()
input_mode = TFCluster.InputMode.SPARK if args.input_mode == 'spark' else TFCluster.InputMode.TENSORFLOW
print("{0} ===== Start".format(datetime.now().isoformat()))
sc = SparkContext(conf=SparkConf().setAppName('imagenet_distributed_train'))
num_executors = int(sc._conf.get("spark.executor.instances"))
num_ps = 1
cluster = TFCluster.run(sc, main_fun, sys.argv, num_executors, num_ps, args.tensorboard, input_mode)
if input_mode == TFCluster.InputMode.SPARK:
dataRDD = sc.newAPIHadoopFile(args.input_data, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
cluster.train(dataRDD, args.epochs)
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))
# arguments for Spark and TFoS
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=executors)
parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
(args, remainder) = parser.parse_known_args()
# construct an ARGV (with script name as first element) from remaining args and pass it to the TF processes on executors
remainder.insert(0, __file__)
print("spark args:", args)
print("tf args:", remainder)
num_workers = args.cluster_size - args.num_ps
print("===== num_executors={}, num_workers={}, num_ps={}".format(args.cluster_size, num_workers, args.num_ps))
cluster = TFCluster.run(sc, main_fun, remainder, args.cluster_size, args.num_ps, False, TFCluster.InputMode.TENSORFLOW, master_node='master')
cluster.shutdown()
num_executors = int(executors) if executors is not None else 1
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=64)
parser.add_argument("--buffer_size", help="size of shuffle buffer", type=int, default=10000)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--epochs", help="number of epochs", type=int, default=3)
parser.add_argument("--model_dir", help="path to save model/checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=469)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=0, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief')
cluster.shutdown()
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--data_dir", help="path to MNIST data", default="MNIST-data")
parser.add_argument("--model", help="path to save model/checkpoint", default="mnist_model")
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
cluster = TFCluster.run(sc, main, args, args.cluster_size, args.num_ps, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, log_dir=args.model, master_node='master')
cluster.shutdown()
break
time.sleep(FLAGS.eval_interval_secs)
#cifar10.maybe_download_and_extract()
if tf.gfile.Exists(FLAGS.eval_dir):
tf.gfile.DeleteRecursively(FLAGS.eval_dir)
tf.gfile.MakeDirs(FLAGS.eval_dir)
evaluate()
if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setAppName("cifar10_eval"))
num_executors = int(sc._conf.get("spark.executor.instances"))
num_ps = 0
cluster = TFCluster.run(sc, main_fun, sys.argv, num_executors, num_ps, False, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()