Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_spark_ml_model(spark_context):
df = to_data_frame(spark_context, x_train, y_train, categorical=True)
test_df = to_data_frame(spark_context, x_test, y_test, categorical=True)
sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
sgd_conf = optimizers.serialize(sgd)
# Initialize Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
# Fitting a model returns a Transformer
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(df)
# Evaluate Spark model by evaluating the underlying model
def test_serialization_estimator():
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_loss("categorical_crossentropy")
estimator.save("test.h5")
load_ml_estimator("test.h5")
model.add(Dense(10))
model.add(Activation('softmax'))
# Create Spark context
conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
sc = SparkContext(conf=conf)
# Build RDD from numpy features and labels
df = to_data_frame(sc, x_train, y_train, categorical=True)
test_df = to_data_frame(sc, x_test, y_test, categorical=True)
sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
sgd_conf = optimizers.serialize(sgd)
# Initialize Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
estimator.set_nb_epoch(nb_epoch)
estimator.set_batch_size(batch_size)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
# Fitting a model returns a Transformer
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(df)
# Evaluate Spark model by evaluating the underlying model
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')
sgd = optimizers.SGD(lr=0.01)
sgd_conf = optimizers.serialize(sgd)
# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
estimator.setFeaturesCol("scaled_features")
estimator.setLabelCol("index_category")
estimator.set_epochs(10)
estimator.set_batch_size(128)
estimator.set_num_workers(1)
estimator.set_verbosity(0)
estimator.set_validation_split(0.15)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
# Fitting a model returns a Transformer
def load_ml_estimator(file_name):
f = h5py.File(file_name, mode='r')
elephas_conf = json.loads(f.attrs.get('distributed_config'))
config = elephas_conf.get('config')
return ElephasEstimator(**config)
def __init__(self, **kwargs):
super(ElephasEstimator, self).__init__()
self.set_params(**kwargs)