Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_mnist(data_type="train", location="/tmp/mnist"):
"""
Get mnist dataset with features and label as ndarray.
Data would be downloaded automatically if it doesn't present at the specific location.
:param data_type: "train" for training data and "test" for testing data.
:param location: Location to store mnist dataset.
:return: (features: ndarray, label: ndarray)
"""
X, Y = mnist.read_data_sets(location, data_type)
return X, Y + 1 # The label of ClassNLLCriterion starts from 1 instead of 0
def get_data_rdd(dataset):
(images_data, labels_data) = mnist.read_data_sets("/tmp/mnist", dataset)
image_rdd = sc.parallelize(images_data[:data_num])
labels_rdd = sc.parallelize(labels_data[:data_num])
rdd = image_rdd.zip(labels_rdd) \
.map(lambda rec_tuple: [normalizer(rec_tuple[0], mnist.TRAIN_MEAN, mnist.TRAIN_STD),
np.array(rec_tuple[1])])
return rdd
def get_data_rdd(dataset):
(images_data, labels_data) = mnist.read_data_sets("/tmp/mnist", dataset)
image_rdd = sc.parallelize(images_data[:data_num])
labels_rdd = sc.parallelize(labels_data[:data_num])
rdd = image_rdd.zip(labels_rdd) \
.map(lambda rec_tuple: [normalizer(rec_tuple[0], mnist.TRAIN_MEAN, mnist.TRAIN_STD),
np.array(rec_tuple[1])])
return rdd
dataset = TFDataset.from_rdd(rdd,
features=(tf.float32, [28, 28, 1]),
batch_per_thread=20)
predictor = TFPredictor.from_keras(model, dataset)
accuracy = predictor.predict().zip(labels_rdd).map(lambda x: np.argmax(x[0]) == x[1]).mean()
print("predict accuracy is %s" % accuracy)
else:
# using keras api for local evaluation
model.compile(optimizer='rmsprop',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
(images_data, labels_data) = mnist.read_data_sets("/tmp/mnist", "test")
images_data = normalizer(images_data, mnist.TRAIN_MEAN, mnist.TRAIN_STD)
result = model.evaluate(images_data, labels_data)
print(model.metrics_names)
print(result)
def main(max_epoch):
_ = init_nncontext()
(training_images_data, training_labels_data) = mnist.read_data_sets("/tmp/mnist", "train")
(testing_images_data, testing_labels_data) = mnist.read_data_sets("/tmp/mnist", "test")
training_images_data = (training_images_data - mnist.TRAIN_MEAN) / mnist.TRAIN_STD
testing_images_data = (testing_images_data - mnist.TRAIN_MEAN) / mnist.TRAIN_STD
model = tf.keras.Sequential(
[tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
]
)
model.compile(optimizer='rmsprop',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
def get_data_rdd(dataset, sc):
from bigdl.dataset import mnist
(images_data, labels_data) = mnist.read_data_sets("/tmp/mnist", dataset)
image_rdd = sc.parallelize(images_data)
labels_rdd = sc.parallelize(labels_data)
rdd = image_rdd.zip(labels_rdd) \
.map(lambda rec_tuple: ((rec_tuple[0] - mnist.TRAIN_MEAN) / mnist.TRAIN_STD,
np.array(rec_tuple[1])))
return rdd