Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dfutils(self):
# create a DataFrame of a single row consisting of standard types (str, int, int_array, float, float_array, binary)
row1 = ('text string', 1, [2, 3, 4, 5], -1.1, [-2.2, -3.3, -4.4, -5.5], bytearray(b'\xff\xfe\xfd\xfc'))
rdd = self.sc.parallelize([row1])
df1 = self.spark.createDataFrame(rdd, ['a', 'b', 'c', 'd', 'e', 'f'])
print("schema: {}".format(df1.schema))
# save the DataFrame as TFRecords
dfutil.saveAsTFRecords(df1, self.tfrecord_dir)
self.assertTrue(os.path.isdir(self.tfrecord_dir))
# reload the DataFrame from exported TFRecords
df2 = dfutil.loadTFRecords(self.sc, self.tfrecord_dir, binary_features=['f'])
row2 = df2.take(1)[0]
print("row_saved: {}".format(row1))
print("row_loaded: {}".format(row2))
# confirm loaded values match original/saved values
self.assertEqual(row1[0], row2['a'])
self.assertEqual(row1[1], row2['b'])
self.assertEqual(row1[2], row2['c'])
self.assertAlmostEqual(row1[3], row2['d'], 6)
for i in range(len(row1[4])):
self.assertAlmostEqual(row1[4][i], row2['e'][i], 6)
print("row_saved: {}".format(row1))
print("row_loaded: {}".format(row2))
# confirm loaded values match original/saved values
self.assertEqual(row1[0], row2['a'])
self.assertEqual(row1[1], row2['b'])
self.assertEqual(row1[2], row2['c'])
self.assertAlmostEqual(row1[3], row2['d'], 6)
for i in range(len(row1[4])):
self.assertAlmostEqual(row1[4][i], row2['e'][i], 6)
print("type(f): {}".format(type(row2['f'])))
for i in range(len(row1[5])):
self.assertEqual(row1[5][i], row2['f'][i])
# check origin of each DataFrame
self.assertFalse(dfutil.isLoadedDF(df1))
self.assertTrue(dfutil.isLoadedDF(df2))
# references are equivalent
df_ref = df2
self.assertTrue(dfutil.isLoadedDF(df_ref))
# mutated DFs are not equal, even if contents are identical
df3 = df2.filter(df2.a == 'string_label')
self.assertFalse(dfutil.isLoadedDF(df3))
# re-used/re-assigned variables are not equal
df2 = df3
self.assertFalse(dfutil.isLoadedDF(df2))
df = dfutil.loadTFRecords(sc, args.train_data, binary_features=['image/encoded'])
estimator = TFEstimator(main_fun, sys.argv, export_fn=inception_export.export) \
.setModelDir(args.train_dir) \
.setExportDir(args.export_dir) \
.setTFRecordDir(args.tfrecord_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTensorboard(args.tensorboard) \
print("{0} ===== Train".format(datetime.now().isoformat()))
model = estimator.fit(df)
print("{0} ===== Inference".format(datetime.now().isoformat()))
df = dfutil.loadTFRecords(sc, args.validation_data, binary_features=['image/encoded'])
preds = model.setTagSet(tf.saved_model.tag_constants.SERVING) \
.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) \
.setInputMapping({'image/encoded': 'jpegs', 'image/class/label': 'labels'}) \
.setOutputMapping({'top_5_acc': 'output'}) \
.transform(df)
preds.write.json(args.output)
print("{0} ===== Stop".format(datetime.now().isoformat()))
parser.add_argument("--epochs", help="number of epochs", type=int, default=3)
parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv")
parser.add_argument("--images_labels", help="path to MNIST images and labels in parallelized format")
parser.add_argument("--learning_rate", help="learning rate", type=float, default=1e-3)
parser.add_argument("--mode", help="train|inference", choices=["train", "inference"], default="train")
parser.add_argument("--model_dir", help="path to save checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--output", help="HDFS path to save predictions", type=str, default="predictions")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
if args.format == 'tfr':
# load TFRecords as a DataFrame
df = dfutil.loadTFRecords(sc, args.images_labels)
else: # args.format == 'csv':
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
images_labels = sc.textFile(args.images_labels).map(parse)
df = spark.createDataFrame(images_labels, ['image', 'label'])
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
# Execution Modes
parser.add_argument("--train", help="train a model using Estimator", action="store_true")
parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct)", choices=["none", "signature", "direct"], default="none")
parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions", "features"], default="predictions")
args = parser.parse_args()
print("args:", args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
df = dfutil.loadTFRecords(sc, args.images)
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [int(float(x)) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, args, export_fn=mnist_dist_pipeline.export_fun) \
parser.add_argument("--input_mode", help="method to ingest data: (spark|tf)", choices=["spark","tf"], default="tf")
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
parser.add_argument("--output", help="HDFS path to save output predictions", type=str)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("--train_dir", help="HDFS path to save/load model during train/inference", type=str)
parser.add_argument("--tfrecord_dir", help="HDFS path to temporarily save DataFrame to disk", type=str)
parser.add_argument("--train_data", help="HDFS path to training data", type=str)
parser.add_argument("--validation_data", help="HDFS path to validation data", type=str)
(args,rem) = parser.parse_known_args()
input_mode = TFCluster.InputMode.SPARK if args.input_mode == 'spark' else TFCluster.InputMode.TENSORFLOW
print("{0} ===== Start".format(datetime.now().isoformat()))
df = dfutil.loadTFRecords(sc, args.train_data, binary_features=['image/encoded'])
estimator = TFEstimator(main_fun, sys.argv, export_fn=inception_export.export) \
.setModelDir(args.train_dir) \
.setExportDir(args.export_dir) \
.setTFRecordDir(args.tfrecord_dir) \
.setClusterSize(args.cluster_size) \
.setNumPS(args.num_ps) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setTensorboard(args.tensorboard) \
print("{0} ===== Train".format(datetime.now().isoformat()))
model = estimator.fit(df)
print("{0} ===== Inference".format(datetime.now().isoformat()))
df = dfutil.loadTFRecords(sc, args.validation_data, binary_features=['image/encoded'])
preds = model.setTagSet(tf.saved_model.tag_constants.SERVING) \
.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) \
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
# Execution Modes
parser.add_argument("--train", help="train a model using Estimator", action="store_true")
parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct|checkpoint)", choices=["none", "signature", "direct", "checkpoint"], default="none")
parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions", "features"], default="predictions")
args = parser.parse_args()
print("args:", args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
df = dfutil.loadTFRecords(sc, args.images)
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94}
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
# Execution Modes
parser.add_argument("--train", help="train a model using Estimator", action="store_true")
parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct|checkpoint)", choices=["none","signature","direct","checkpoint"], default="none")
parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions","features"], default="predictions")
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
df = dfutil.loadTFRecords(sc, args.images)
elif args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
raise Exception("Unsupported format: {}".format(args.format))
# Pipeline API
if args.train:
# train a model using Spark Estimator fitted to a DataFrame
print("{0} ===== Estimator.fit()".format(datetime.now().isoformat()))
# dummy tf args (from imagenet/inception example)
tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 }
estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--epochs", help="number of epochs", type=int, default=3)
parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv")
parser.add_argument("--images_labels", help="path to MNIST images and labels in parallelized format")
parser.add_argument("--mode", help="train|inference", choices=["train", "inference"], default="train")
parser.add_argument("--model_dir", help="path to save checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--output", help="HDFS path to save predictions", type=str, default="predictions")
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
if args.format == 'tfr':
# load TFRecords as a DataFrame
df = dfutil.loadTFRecords(sc, args.images_labels)
else: # args.format == 'csv':
# create RDD of input data
def parse(ln):
vec = [int(x) for x in ln.split(',')]
return (vec[1:], vec[0])
images_labels = sc.textFile(args.images_labels).map(parse)
df = spark.createDataFrame(images_labels, ['image', 'label'])
df.show()
if args.mode == 'train':
estimator = TFEstimator(main_fun, args) \
.setInputMapping({'image': 'image', 'label': 'label'}) \
.setModelDir(args.model_dir) \
.setExportDir(args.export_dir) \