Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_z(self):
self.lp.reset(password=None, require_password=False)
self.lp.add_wf(wf_creator_get_z([5, 11, 'blue'], self.lp))
for _ in range(2):
launch_rocket(self.lp)
col = self.db.test_get_z
loop1 = col.find_one({'index': 1})
loop2 = col.find_one({'index': 2})
self.assertEqual(col.find({}).count(), 4)
self.assertEqual(loop1['x'], [5, 11, 'blue'])
self.assertEqual(loop1['z'], [25.0, 121.0])
self.assertEqual(loop2['x'], [3, 12.0, 'green'])
self.assertEqual(loop2['z'], [9, 144.0])
def test_complex(self):
self.lp.reset(password=None, require_password=False)
self.lp.add_wf(wf_creator_complex([5, 11, 'blue'], self.lp))
for _ in range(10):
launch_rocket(self.lp)
col = self.db.test_complex
loop1 = col.find({'index': 1}) # should return one doc, for the first WF
loop2 = col.find({'index': 2}) # should return one doc, for the second WF
reserved = col.find({'y': 'reserved'})
self.assertEqual(col.find({}).count(), 4)
self.assertEqual(reserved.count(), 1)
self.assertEqual(loop1.count(), 1)
self.assertEqual(loop2.count(), 1)
# Run run_num iterations using Skopt Gaussian Processes
gp_best = []
wf = workflow_creator(input_dict, 'skopt_gp')
launchpad.add_wf(wf)
for i in range(run_num):
launch_rocket(launchpad)
gp_best.append(manageDB.get_optima('D', min_or_max='max')[0])
manageDB.clean()
launchpad.defuse_wf(launchpad.get_fw_ids()[-1])
# Run run_num iterations using a dummy optimizer (returns random)
dummy_best = []
wf = workflow_creator(input_dict, 'dummy')
launchpad.add_wf(wf)
for i in range(run_num):
launch_rocket(launchpad)
dummy_best.append(manageDB.get_optima('D', min_or_max='max')[0])
manageDB.clean()
iterations = list(range(run_num))
print("GP best:", gp_best[-1])
print("Dummy best: ", dummy_best[-1])
plt.plot(iterations,gp_best,'g', iterations, dummy_best,'r')
plt.show()
if __name__ == "__main__":
# set up the LaunchPad and reset it
launchpad = LaunchPad()
launchpad.reset('', require_password=False)
fw_spec = {'input':{'A':1, 'B':2, 'C':3}}
param_list = {'data':['X', 'y'], 'hyperparameters':[]}
# create the Firework consisting of a single task
firetask1 = CalculateTask()
firetask2 = OptimizeTask()
firework = Firework([firetask1, firetask2])
# store workflow and launch it locally
launchpad.add_wf(firework)
launch_rocket(launchpad)
# Run run_num iterations using Skopt Gaussian Processes
gp_best = []
wf = workflow_creator(input_dict, 'skopt_gp')
launchpad.add_wf(wf)
for i in range(run_num):
launch_rocket(launchpad)
gp_best.append(manageDB.get_optima('f', min_or_max='max')[0])
manageDB.clean()
launchpad.defuse_wf(launchpad.get_fw_ids()[-1])
# Run run_num iterations using a dummy optimizer (returns random)
dummy_best = []
wf = workflow_creator(input_dict, 'dummy')
launchpad.add_wf(wf)
for i in range(run_num):
launch_rocket(launchpad)
dummy_best.append(manageDB.get_optima('f', min_or_max='max')[0])
manageDB.clean()
iterations = list(range(run_num))
print("GP best:", gp_best[-1])
print("Dummy best: ", dummy_best[-1])
plt.plot(iterations,gp_best,'g', iterations, dummy_best,'r')
plt.show()
manageDB.clean()
# Run some number of iterations until dummy iteration has converged
launchpad.defuse_wf(launchpad.get_fw_ids()[-1])
dummy_best = []
dummy_iter = 0
wf = workflow_creator(input_dict, 'dummy')
launchpad.add_wf(wf)
dummy_iter = dummy_iter + 1
launch_rocket(launchpad)
dummy_best.append(manageDB.get_optima('D', min_or_max='max')[0])
while (dummy_best[-1] <= tolerance * max_val):
dummy_iter = dummy_iter + 1
launch_rocket(launchpad)
dummy_best.append(manageDB.get_optima('D', min_or_max='max')[0])
manageDB.clean()
print("GP iterations:", gp_iter)
print("Dummy iterations:", dummy_iter)
plt.plot(list(range(gp_iter)), gp_best, 'g', list(range(dummy_iter)), dummy_best, 'r')
plt.show()
def test_duplicates(self):
self.lp.reset(password=None, require_password=False)
self.lp.add_wf(wf_creator_duplicates([5, 11, 'blue'], self.lp))
for _ in range(2):
launch_rocket(self.lp)
col = self.db.test_duplicates
loop1 = col.find({'x': [5, 11, 'blue']}) # should return one doc, for the first WF
loop2 = col.find({'x': [3, 12, 'green']}) # should return one doc, for the second WF
reserved = col.find({'y': 'reserved'})
self.assertEqual(col.find({}).count(), 4)
self.assertEqual(reserved.count(), 1)
self.assertEqual(loop1.count(), 1)
self.assertEqual(loop2.count(), 1) # no duplicates are in the db
db = getattr(conn, rundb)
collection = db.test_perovskites
filedir = os.path.dirname(os.path.realpath(__file__))
launchpad = LaunchPad(name=rundb)
launchpad.reset(password=None, require_password=False)
launchpad.add_wf(wf_creator(random.choice(space_noex), predictor, get_z, launchpad,
filedir + '/space_gs_mend.p',
None,
# filedir + '/persistent_z.p',
chemical_rules=False))
y = []
cands = 0
while cands != n_cands:
launch_rocket(launchpad)
cands = collection.find({'y':30.0}).count()
y.append(cands)
pickle.dump(y, open(filename + "_{}".format(i), 'w'))
Y.append(y)
launchpad.connection.drop_database(TESTDB_NAME)
pickle.dump(Y, open(filename, 'w'))
"""
fetch a single firework from the database and run it on the machine where this script
is invoked
"""
from fireworks import LaunchPad
from fireworks.core.rocket_launcher import launch_rocket #rapidfire
launchpad = LaunchPad(host='localhost', port=27017, name='fireworks',
username="km468", password="km468")
print 'fireworks in the database before adding the workflow: \n', launchpad.get_fw_ids()
launch_rocket(launchpad)#, fworker=None, fw_id=None, strm_lvl='INFO')
print 'fireworks in the database: \n', launchpad.get_fw_ids()
#rapidfire(launchpad)