Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setDriverPSNodes(args.driver_ps_nodes) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTFRecordDir(args.tfrecord_dir) \
.setProtocol(args.protocol) \
.setReaders(args.readers) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
# NO INFERENCING
if args.inference_mode == 'none':
sys.exit(0)
# INFER USING EXPORTED SIGNATURES OF TENSORFLOW SAVED_MODEL
elif args.inference_mode == 'signature':
model.setModelDir(None)
model.setExportDir(args.export_dir) # load saved_model from args.export_dir
model.setTagSet(tf.saved_model.tag_constants.SERVING) # using default SERVING tagset
model.setInputMapping({'image': 'image'}) # map DataFrame 'image' column to the 'image' input tensor alias of signature
if args.inference_output == 'predictions':
model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) # default signature def key, i.e. 'predict'
model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor alias to output DataFrame 'col_out' column
return (image, label)
dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
if args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
else: # args.format == "pickle":
images = sc.pickleFile(args.images)
labels = sc.pickleFile(args.labels)
print("zipping images and labels")
dataRDD = images.zip(labels)
# Pipeline API
df = spark.createDataFrame(dataRDD, ['col1', 'col2'])
model = TFModel(args) \
.setBatchSize(args.batch_size)
#
# Using saved_model w/ signature defs and tensor aliases
#
# prediction
model.setTagSet(tf.saved_model.tag_constants.SERVING) \
.setExportDir(args.export_dir) \
.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) \
.setInputMapping({'col1':'image'}) \
.setOutputMapping({'prediction':'col_out'})
# featurize
# model.setTagSet(tf.saved_model.tag_constants.SERVING) \
# .setExportDir(args.export_dir) \
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setProtocol(args.protocol) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
# NO INFERENCING
if args.inference_mode == 'none':
sys.exit(0)
# INFER FROM TENSORFLOW CHECKPOINT
elif args.inference_mode == 'checkpoint':
model.setModelDir(args.model_dir) # load model from checkpoint at args.model_dir
model.setExportDir(None)
model.setInputMapping({'image': 'x'}) # map DataFrame 'image' column to the 'x' input tensor
if args.inference_output == 'predictions':
model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column
else: # args.inference_output == 'features':
model.setOutputMapping({'prediction': 'col_out', 'Relu': 'col_out2'}) # add 'Relu' output tensor to output DataFrame 'col_out2' column
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
.setInputMapping({'image':'image', 'label':'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setDriverPSNodes(args.driver_ps_nodes) \
.setProtocol(args.protocol) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.steps)
model = estimator.fit(df)
else:
# use a previously trained/exported model
model = TFModel(args) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
# NO INFERENCING
if args.inference_mode == 'none':
sys.exit(0)
# INFER FROM TENSORFLOW CHECKPOINT
elif args.inference_mode == 'checkpoint':
model.setModelDir(args.model_dir) # load model from checkpoint at args.model_dir
model.setExportDir(None)
model.setInputMapping({'image':'x'}) # map DataFrame 'image' column to the 'x' input tensor
if args.inference_output == 'predictions':
model.setOutputMapping({'prediction':'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column
else: # args.inference_output == 'features':
model.setOutputMapping({'prediction':'col_out', 'Relu':'col_out2'}) # add 'Relu' output tensor to output DataFrame 'col_out2' column
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setGraceSecs(60)
model = estimator.fit(df)
else: # args.mode == 'inference':
# using a trained/exported model
model = TFModel(args) \
.setInputMapping({'image': 'features'}) \
.setOutputMapping({'logits': 'prediction'}) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
def argmax_fn(l):
return max(range(len(l)), key=lambda i: l[i])
argmax = udf(argmax_fn, IntegerType())
preds = model.transform(df).withColumn('argmax', argmax('prediction'))
preds.show()
preds.write.json(args.output)
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
.setClusterSize(args.cluster_size) \
.setTensorboard(args.tensorboard) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setGraceSecs(60)
model = estimator.fit(df)
else: # args.mode == 'inference':
# using a trained/exported model
model = TFModel(args) \
.setInputMapping({'image': 'conv2d_input'}) \
.setOutputMapping({'dense_1': 'prediction'}) \
.setExportDir(args.export_dir) \
.setBatchSize(args.batch_size)
def argmax_fn(l):
return max(range(len(l)), key=lambda i: l[i])
argmax = udf(argmax_fn, IntegerType())
preds = model.transform(df).withColumn('argmax', argmax('prediction'))
preds.show()
preds.write.json(args.output)
sc = SparkContext.getOrCreate()
logger.info("===== 1. train args: {0}".format(self.args))
logger.info("===== 2. train params: {0}".format(self._paramMap))
local_args = self.merge_args_params()
logger.info("===== 3. train args + params: {0}".format(local_args))
tf_args = self.args.argv if self.args.argv else local_args
cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps,
local_args.tensorboard, TFCluster.InputMode.SPARK, master_node=local_args.master_node, driver_ps_nodes=local_args.driver_ps_nodes)
# feed data, using a deterministic order for input columns (lexicographic by key)
input_cols = sorted(self.getInputMapping())
cluster.train(dataset.select(input_cols).rdd, local_args.epochs)
cluster.shutdown(grace_secs=self.getGraceSecs())
return self._copyValues(TFModel(self.args))
def __init__(self, tf_args):
super(TFModel, self).__init__()
self.args = Namespace(tf_args)
self._setDefault(input_mapping={},
output_mapping={},
batch_size=100,
model_dir=None,
export_dir=None,
signature_def_key='serving_default',
tag_set='serve')