Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# This fails with: "NotImplementedError: `fit_generator` is not supported for models compiled with tf.distribute.Strategy"
# model.fit_generator(ds, epochs=args.epochs, steps_per_epoch=steps_per_epoch, callbacks=callbacks)
if args.export_dir:
print("exporting model to: {}".format(args.export_dir))
compat.export_saved_model(model, args.export_dir, ctx.job_name == 'chief')
tf_feed.terminate()
# create a Spark DataFrame of training examples (features, labels)
rdd = self.sc.parallelize(self.train_examples, 2)
trainDF = rdd.toDF(['col1', 'col2'])
# train and export model
args = {}
estimator = TFEstimator(_spark_train, args) \
.setInputMapping({'col1': 'x', 'col2': 'y_'}) \
.setModelDir(self.model_dir) \
.setExportDir(self.export_dir) \
.setClusterSize(self.num_workers) \
.setNumPS(0) \
.setBatchSize(1) \
.setEpochs(1)
model = estimator.fit(trainDF)
self.assertTrue(os.path.isdir(self.export_dir))
# create a Spark DataFrame of test examples (features, labels)
testDF = self.spark.createDataFrame(self.test_examples, ['c1', 'c2'])
# test saved_model using exported signature
model.setTagSet('serve') \
.setSignatureDefKey('serving_default') \
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setProtocol(args.protocol) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
parser.add_argument("--output", help="HDFS path to save output predictions", type=str)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("--train_dir", help="HDFS path to save/load model during train/inference", type=str)
parser.add_argument("--tfrecord_dir", help="HDFS path to temporarily save DataFrame to disk", type=str)
parser.add_argument("--train_data", help="HDFS path to training data", type=str)
parser.add_argument("--validation_data", help="HDFS path to validation data", type=str)
(args,rem) = parser.parse_known_args()
input_mode = TFCluster.InputMode.SPARK if args.input_mode == 'spark' else TFCluster.InputMode.TENSORFLOW
print("{0} ===== Start".format(datetime.now().isoformat()))
df = dfutil.loadTFRecords(sc, args.train_data, binary_features=['image/encoded'])
estimator = TFEstimator(main_fun, sys.argv, export_fn=inception_export.export) \
.setModelDir(args.train_dir) \
.setExportDir(args.export_dir) \
.setTFRecordDir(args.tfrecord_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTensorboard(args.tensorboard) \
print("{0} ===== Train".format(datetime.now().isoformat()))
model = estimator.fit(df)
print("{0} ===== Inference".format(datetime.now().isoformat()))
df = dfutil.loadTFRecords(sc, args.validation_data, binary_features=['image/encoded'])
preds = model.setTagSet(tf.saved_model.tag_constants.SERVING) \
.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) \
.setInputMapping({'image/encoded': 'jpegs', 'image/class/label': 'labels'}) \
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [int(float(x)) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, args, export_fn=mnist_dist_pipeline.export_fun) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setDriverPSNodes(args.driver_ps_nodes) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTFRecordDir(args.tfrecord_dir) \
.setProtocol(args.protocol) \
.setReaders(args.readers) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
if args.format == 'tfr':
# load TFRecords as a DataFrame
df = dfutil.loadTFRecords(sc, args.images_labels)
else: # args.format == 'csv':
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
images_labels = sc.textFile(args.images_labels).map(parse)
df = spark.createDataFrame(images_labels, ['image', 'label'])
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setGraceSecs(60)
model = estimator.fit(df)
else: # args.mode == 'inference':
# using a trained/exported model
model = TFModel(args) \
.setInputMapping({'image': 'conv2d_input'}) \
.setOutputMapping({'dense_1': 'prediction'}) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 }
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
.setInputMapping({'image':'image', 'label':'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setDriverPSNodes(args.driver_ps_nodes) \
.setProtocol(args.protocol) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
if args.format == 'tfr':
# load TFRecords as a DataFrame
df = dfutil.loadTFRecords(sc, args.images_labels)
else: # args.format == 'csv':
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
images_labels = sc.textFile(args.images_labels).map(parse)
df = spark.createDataFrame(images_labels, ['image', 'label'])
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setGraceSecs(60)
model = estimator.fit(df)
else: # args.mode == 'inference':
# using a trained/exported model
model = TFModel(args) \
.setInputMapping({'image': 'features'}) \
.setOutputMapping({'logits': 'prediction'}) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
def __init__(self, train_fn, tf_args):
super(TFEstimator, self).__init__()
self.train_fn = train_fn
self.args = Namespace(tf_args)
self._setDefault(input_mapping={},
cluster_size=1,
num_ps=0,
driver_ps_nodes=False,
master_node='chief',
protocol='grpc',
tensorboard=False,
model_dir=None,
export_dir=None,
tfrecord_dir=None,
batch_size=100,
epochs=1,
readers=1,
steps=1000,