Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fws=[]
connections={}
f = Composition(d_struct.formula).alphabetical_formula
snl = StructureNL(d_struct, 'Joseph Montoya ',
projects=["Elasticity"])
tasks = [AddSNLTask()]
snl_priority = fw_spec.get('priority', 1)
spec = {'task_type': 'Add Deformed Struct to SNL database',
'snl': snl.as_dict(),
'_queueadapter': QA_DB,
'_priority': snl_priority}
if 'snlgroup_id' in fw_spec and isinstance(snl, MPStructureNL):
spec['force_mpsnl'] = snl.as_dict()
spec['force_snlgroup_id'] = fw_spec['snlgroup_id']
del spec['snl']
fws.append(Firework(tasks, spec,
name=get_slug(f + '--' + spec['task_type']),
fw_id=-1000+i*10))
connections[-1000+i*10] = [-999+i*10]
spec = snl_to_wf._snl_to_spec(snl,
parameters={'exact_structure':True})
spec = update_spec_force_convergence(spec)
spec['deformation_matrix'] = d_struct_set.deformations[i].tolist()
spec['original_task_id'] = fw_spec["task_id"]
spec['_priority'] = fw_spec['_priority']*2
#Turn off dupefinder for deformed structure
del spec['_dupefinder']
spec['task_type'] = "Optimize deformed structure"
fws.append(Firework([VaspWriterTask(), SetupElastConstTask(),
get_custodian_task(spec)],
spec,
name=get_slug(f + '--' + spec['task_type']),
spec['_priority'] = priority
spec["_pass_job_info"] = True
spec['_allow_fizzled_parents'] = False
spec['_queueadapter'] = QA_VASP
spec['task_type'] = "force convergence" # Change name here: delete Vasp?
tasks = [VaspWriterTask(), get_custodian_task(spec)]
fws.append(Firework(tasks, spec, name=get_slug(f + '--' + spec['task_type']), fw_id=1))
connections[0] = [1] # define fw_id=1 is dependent on completion of fw_id=0
# insert into DB - Force optimization
spec = {'task_type': 'VASP db insertion', '_priority': priority, "_pass_job_info": True, '_allow_fizzled_parents': True, '_queueadapter': QA_DB}
fws.append(Firework([VaspToDBTask()], spec, name=get_slug(f + '--' + spec['task_type']), fw_id=2))
connections[1] = [2] # define fw_id=2 is dependent on completion of fw_id=1
spec= {'task_type': 'Setup DFPT Dielectrics Task', '_priority': priority, "_pass_job_info": True, '_allow_fizzled_parents': False, '_queueadapter': QA_CONTROL}
fws.append(Firework([SetupDFPTDielectricsTask()], spec, name=get_slug(f + '--' + spec['task_type']), fw_id=3))
connections[2] = [3]
wf_meta = get_meta_from_structure(snl.structure)
wf_meta['run_version'] = 'May 2013 (1)'
if '_materialsproject' in snl.data and 'submission_id' in snl.data['_materialsproject']:
wf_meta['submission_id'] = snl.data['_materialsproject']['submission_id']
return Workflow(fws, connections, name=Composition(snl.structure.composition.reduced_formula).alphabetical_formula, metadata=wf_meta)
# run DFPT for static dielectrics run:
# if 'force_convergence' in snl.projects:
# relaxed_structure = spec['output']['crystal']
# spec['vasp']['poscar'] = relaxed_structure
if not lost_launch_idxs.get(fw_id):
lost_launch_idxs[fw_id] = [m_fw.launch_idx]
else:
lost_launch_idxs[fw_id].append(m_fw.launch_idx)
# Check if EVERY FIREWORK with a given fw_id failed. If so, add to lost_fw_ids
for fw_id in potential_lost_fw_ids: # tricky: figure out what's actually lost
fws = self._find_fws(fw_id)
# only RUNNING FireWorks can be "lost", i.e. not defused or archived
not_lost = [f['launch']['launch_idx'] for f in fws if f['launch']['launch_idx'] not in lost_launch_idxs[fw_id]]
if len(not_lost) == 0: # all launches are lost - we are lost!
lost_fw_ids.append(fw_id)
else:
for l_idx in not_lost:
l_state = self.get_fw_dict_by_id(fw_id, launch_idx=l_idx)['state']
if Firework.STATE_RANKS[l_state] > Firework.STATE_RANKS['FIZZLED']:
break
else:
lost_fw_ids.append(fw_id) # all Launches not lost are anyway FIZZLED / ARCHIVED
# fizzle and rerun
if fizzle or rerun:
for fw_id in lost_fw_ids:
for launch_idx in lost_launch_idxs[fw_id]:
self.mark_fizzled(fw_id, launch_idx)
if rerun:
self.rerun_fw(fw_id)
# return the lost_launch_idxs (i.e. the lost fireworks)
# return the lost_fw_ids (i.e. runs that failed for EVERY launch)
return lost_launch_idxs, lost_fw_ids
spec['force_snlgroup_id'] = fw_spec['snlgroup_id']
del spec['snl']
fws.append(Firework(tasks, spec,
name=get_slug(f + '--' + spec['task_type']),
fw_id=-1000+i*10))
connections[-1000+i*10] = [-999+i*10]
spec = snl_to_wf._snl_to_spec(snl,
parameters={'exact_structure':True})
spec = update_spec_force_convergence(spec)
spec['deformation_matrix'] = d_struct_set.deformations[i].tolist()
spec['original_task_id'] = fw_spec["task_id"]
spec['_priority'] = fw_spec['_priority']*2
#Turn off dupefinder for deformed structure
del spec['_dupefinder']
spec['task_type'] = "Optimize deformed structure"
fws.append(Firework([VaspWriterTask(), SetupElastConstTask(),
get_custodian_task(spec)],
spec,
name=get_slug(f + '--' + spec['task_type']),
fw_id=-999+i*10))
priority = fw_spec['_priority']*3
spec = {'task_type': 'VASP db insertion',
'_priority': priority,
'_allow_fizzled_parents': True,
'_queueadapter': QA_DB,
'elastic_constant':"deformed_structure",
'clean_task_doc':True,
'deformation_matrix':d_struct_set.deformations[i].tolist(),
'original_task_id':fw_spec["task_id"]}
fws.append(Firework([VaspToDBTask(), AddElasticDataToDBTask()], spec,
name=get_slug(f + '--' + spec['task_type']),
connections={}
f = Composition(d_struct.formula).alphabetical_formula
snl = StructureNL(d_struct, 'Bharat Medasani ', projects=["IM_Defects"],
data=data, history=history)
tasks = [AddSNLTask()]
snl_priority = fw_spec.get('priority', 1)
spec = {
'task_type': 'Add Defect Struct to SNL database', 'snl': snl.as_dict(),
'_queueadapter': QA_DB, '_priority': snl_priority
}
if 'snlgroup_id' in fw_spec and isinstance(snl, MPStructureNL):
spec['force_mpsnl'] = snl.as_dict()
spec['force_snlgroup_id'] = fw_spec['snlgroup_id']
del spec['snl']
fws.append(Firework(tasks, spec, name=get_slug(f + '--' + spec['task_type']), fw_id=-1000+i*10))
connections[-1000+i*10] = [-999+i*10]
spec = snl_to_wf._snl_to_spec(snl, parameters={'exact_structure':True})
if not i:
spec = update_spec_bulk_supercell(spec)
spec['task_type'] = "Static calculation of bulk supercell "
else:
spec = update_spec_defect_supercells(spec)
spec['task_type'] = "Optimize defect supercell "
spec['_priority'] = fw_spec['_priority']*2
trackers = [Tracker('FW_job.out'), Tracker('FW_job.error'), Tracker('vasp.out'), Tracker('OUTCAR'), Tracker('OSZICAR'), Tracker('OUTCAR.relax1'), Tracker('OUTCAR.relax2')]
trackers_db = [Tracker('FW_job.out'), Tracker('FW_job.error')]
# run GGA structure optimization
#spec = _snl_to_spec(snl, enforce_gga=True, parameters=parameters)
spec.update(snl_spec)
spec['task_type'] = "Vasp force convergence optimize structure (2x)"
tasks = [VaspWriterTask(), get_custodian_task(spec)]
fws.append(Firework(tasks, spec,
name=get_slug(f + '--' + spec['task_type']), fw_id=1))
# insert into DB - GGA structure optimization
spec = {'task_type': 'VASP db insertion', '_priority': priority,
'_allow_fizzled_parents': True, '_queueadapter': QA_DB,
'clean_task_doc':True, 'elastic_constant':"force_convergence"}
fws.append(Firework([VaspToDBTask()], spec,
name=get_slug(f + '--' + spec['task_type']), fw_id=2))
connections[1] = [2]
spec = {'task_type': 'Setup Deformed Struct Task', '_priority': priority,
'_queueadapter': QA_CONTROL}
fws.append(Firework([SetupDeformedStructTask()], spec,
name=get_slug(f + '--' + spec['task_type']),fw_id=3))
connections[2] = [3]
wf_meta = get_meta_from_structure(snl.structure)
wf_meta['run_version'] = 'May 2013 (1)'
if '_materialsproject' in snl.data and 'submission_id' in snl.data['_materialsproject']:
wf_meta['submission_id'] = snl.data['_materialsproject']['submission_id']
return Workflow(fws, connections, name=Composition(
snl.structure.composition.reduced_formula).alphabetical_formula, metadata=wf_meta)
parameters = parameters if parameters else {}
snl_priority = parameters.get('priority', 1)
priority = snl_priority * 2 # once we start a job, keep going!
f = Composition(snl.structure.composition.reduced_formula).alphabetical_formula
# add the SNL to the SNL DB and figure out duplicate group
tasks = [AddSNLTask()]
spec = {'task_type': 'Add to SNL database', 'snl': snl.as_dict(),
'_queueadapter': QA_DB, '_priority': snl_priority}
if 'snlgroup_id' in parameters and isinstance(snl, MPStructureNL):
spec['force_mpsnl'] = snl.as_dict()
spec['force_snlgroup_id'] = parameters['snlgroup_id']
del spec['snl']
fws.append(Firework(tasks, spec,
name=get_slug(f + '--' + spec['task_type']), fw_id=0))
connections[0] = [1]
parameters["exact_structure"] = True
# run GGA structure optimization for force convergence
spec = snl_to_wf._snl_to_spec(snl, parameters=parameters)
user_vasp_settings = parameters.get("user_vasp_settings")
spec = update_spec_force_convergence(spec, user_vasp_settings)
#import pdb;pdb.set_trace()
spec['run_tags'].append("origin")
spec['_priority'] = priority
spec['_queueadapter'] = QA_VASP
del spec['_dupefinder']
spec['task_type'] = "Vasp force convergence optimize structure (2x)"
tasks = [VaspWriterTask(), get_custodian_task(spec)]
fws.append(Firework(tasks, spec,
incar.update({"NSW": 0, "IBRION": -1, "LVTOT": True, "EDIFF": 0.0001})
incar = Incar.from_dict(incar)
incar.write_file(os.path.join(cwd, folder, "INCAR"))
kpoints = Kpoints.from_dict(relax["input"]["kpoints"])
kpoints.write_file(os.path.join(cwd, folder, "KPOINTS"))
poscar.to("POSCAR", os.path.join(cwd, folder, "POSCAR"))
tasks = [RunCustodianTask(cwd=cwd, folder=folder, debug=debug,
custodian_params=cust_params),
InsertTask(cwd=cwd, folder=folder, debug=debug,
db_credentials=db_credentials,
task_id=task)]
fw = Firework(tasks, name=folder)
fw_ids.append(fw.fw_id)
fws.append(fw)
wf = Workflow(fws, name='Workfunction Calculations')
launchpad = LaunchPad.from_file(os.path.join(os.environ["HOME"],
launchpad_dir,
"my_launchpad.yaml"))
launchpad.add_wf(wf)
Args:
m_dict (dict): either a Workflow dict or a Firework dict
Returns:
Workflow
"""
# accept
if 'fws' in m_dict:
created_on = m_dict.get('created_on')
updated_on = m_dict.get('updated_on')
return Workflow([Firework.from_dict(f) for f in m_dict['fws']],
Workflow.Links.from_dict(m_dict['links']), m_dict.get('name'),
m_dict['metadata'], created_on, updated_on)
else:
return Workflow.from_Firework(Firework.from_dict(m_dict))