Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
user_incar_settings = {"ISMEAR": 1, "SIGMA": 0.2}
else:
user_incar_settings = {}
print 'Adding more runs...'
type_name = 'GGA+U' if 'GGA+U' in fw_spec['prev_task_type'] else 'GGA'
snl = StructureNL.from_dict(fw_spec['mpsnl'])
f = Composition(snl.structure.composition.reduced_formula).alphabetical_formula
fws = []
connections = {}
priority = fw_spec['_priority']
trackers = [Tracker('FW_job.out'), Tracker('FW_job.error'), Tracker('vasp.out'), Tracker('OUTCAR'), Tracker('OSZICAR')]
trackers_db = [Tracker('FW_job.out'), Tracker('FW_job.error')]
# run GGA static
spec = fw_spec # pass all the items from the current spec to the new
spec.update({'task_type': '{} static v2'.format(type_name), '_queueadapter': QA_VASP_SMALL,
'_dupefinder': DupeFinderVasp().to_dict(), '_priority': priority, '_trackers': trackers})
fws.append(
Firework(
[VaspCopyTask({'use_CONTCAR': True, 'skip_CHGCAR': True}), SetupStaticRunTask({"kpoints_density": static_dens, 'user_incar_settings': user_incar_settings}),
get_custodian_task(spec)], spec, name=get_slug(f+'--'+spec['task_type']), fw_id=-10))
# insert into DB - GGA static
spec = {'task_type': 'VASP db insertion', '_queueadapter': QA_DB,
'_allow_fizzled_parents': True, '_priority': priority*2, "_dupefinder": DupeFinderDB().to_dict(), '_trackers': trackers_db}
fws.append(
Firework([VaspToDBTask()], spec, name=get_slug(f+'--'+spec['task_type']), fw_id=-9))
else:
user_incar_settings = {}
print 'Adding more runs...'
type_name = 'GGA+U' if 'GGA+U' in fw_spec['prev_task_type'] else 'GGA'
snl = StructureNL.from_dict(fw_spec['mpsnl'])
f = Composition(snl.structure.composition.reduced_formula).alphabetical_formula
fws = []
connections = {}
priority = fw_spec['_priority']
trackers = [Tracker('FW_job.out'), Tracker('FW_job.error'), Tracker('vasp.out'), Tracker('OUTCAR'), Tracker('OSZICAR')]
trackers_db = [Tracker('FW_job.out'), Tracker('FW_job.error')]
# run GGA static
spec = fw_spec # pass all the items from the current spec to the new
spec.update({'task_type': '{} static v2'.format(type_name), '_queueadapter': QA_VASP_SMALL,
'_dupefinder': DupeFinderVasp().to_dict(), '_priority': priority, '_trackers': trackers})
fws.append(
Firework(
[VaspCopyTask({'use_CONTCAR': True, 'skip_CHGCAR': True}), SetupStaticRunTask({"kpoints_density": static_dens, 'user_incar_settings': user_incar_settings}),
get_custodian_task(spec)], spec, name=get_slug(f+'--'+spec['task_type']), fw_id=-10))
# insert into DB - GGA static
spec = {'task_type': 'VASP db insertion', '_queueadapter': QA_DB,
'_allow_fizzled_parents': True, '_priority': priority*2, "_dupefinder": DupeFinderDB().to_dict(), '_trackers': trackers_db}
fws.append(
Firework([VaspToDBTask()], spec, name=get_slug(f+'--'+spec['task_type']), fw_id=-9))
connections[-10] = -9
snl_spec['mpsnl'] = snl.as_dict()
else:
raise ValueError("improper use of force SNL")
snl_spec['snlgroup_id'] = parameters['snlgroup_id']
else:
# add the SNL to the SNL DB and figure out duplicate group
tasks = [AddSNLTask()]
spec = {'task_type': 'Add to SNL database',
'snl': snl.as_dict(), '_queueadapter': QA_DB,
'_priority': snl_priority}
fws.append(Firework(tasks, spec, fw_id=0,
name=get_slug(f + '--' + spec['task_type'])))
connections[0] = [1]
trackers = [Tracker('FW_job.out'), Tracker('FW_job.error'),
Tracker('vasp.out'), Tracker('OUTCAR'), Tracker('OSZICAR'),
Tracker('OUTCAR.relax1'), Tracker('OUTCAR.relax2')]
trackers_db = [Tracker('FW_job.out'), Tracker('FW_job.error')]
# run GGA structure optimization
spec = _snl_to_spec(snl, enforce_gga=True, parameters=parameters)
spec.update(snl_spec)
spec['_priority'] = priority
spec['_queueadapter'] = QA_VASP
spec['_trackers'] = trackers
tasks = [VaspWriterTask(), get_custodian_task(spec)]
fws.append(Firework(tasks, spec, name=get_slug(f + '--' + spec['task_type']), fw_id=1))
# insert into DB - GGA structure optimization
spec = {'task_type': 'VASP db insertion', '_priority': priority*2,
'_allow_fizzled_parents': True, '_queueadapter': QA_DB,
"_dupefinder": DupeFinderDB().to_dict(), '_trackers': trackers_db}
fws.append(Firework([VaspToDBTask()], spec, fw_id=2,
else:
raise ValueError("improper use of force SNL")
snl_spec['snlgroup_id'] = parameters['snlgroup_id']
else:
# add the SNL to the SNL DB and figure out duplicate group
tasks = [AddSNLTask()]
spec = {'task_type': 'Add to SNL database',
'snl': snl.as_dict(), '_queueadapter': QA_DB,
'_priority': snl_priority}
fws.append(Firework(tasks, spec, fw_id=0,
name=get_slug(f + '--' + spec['task_type'])))
connections[0] = [1]
trackers = [Tracker('FW_job.out'), Tracker('FW_job.error'),
Tracker('vasp.out'), Tracker('OUTCAR'), Tracker('OSZICAR'),
Tracker('OUTCAR.relax1'), Tracker('OUTCAR.relax2')]
trackers_db = [Tracker('FW_job.out'), Tracker('FW_job.error')]
# run GGA structure optimization
spec = _snl_to_spec(snl, enforce_gga=True, parameters=parameters)
spec.update(snl_spec)
spec['_priority'] = priority
spec['_queueadapter'] = QA_VASP
spec['_trackers'] = trackers
tasks = [VaspWriterTask(), get_custodian_task(spec)]
fws.append(Firework(tasks, spec, name=get_slug(f + '--' + spec['task_type']), fw_id=1))
# insert into DB - GGA structure optimization
spec = {'task_type': 'VASP db insertion', '_priority': priority*2,
'_allow_fizzled_parents': True, '_queueadapter': QA_DB,
"_dupefinder": DupeFinderDB().to_dict(), '_trackers': trackers_db}
fws.append(Firework([VaspToDBTask()], spec, fw_id=2,
name=get_slug(f + '--' + spec['task_type'])))
def add_trackers(original_wf, tracked_files=None, nlines=25):
"""
Every FireWork that runs VASP also tracks the OUTCAR, OSZICAR, etc using FWS Trackers.
Args:
original_wf (Workflow)
tracked_files (list) : list of files to be tracked
nlines (int): number of lines at the end of files to be tracked
"""
if tracked_files is None:
tracked_files = ["OUTCAR", "OSZICAR"]
trackers = [Tracker(f, nlines=nlines, allow_zipped=True) for f in tracked_files]
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint="RunVasp"):
if "_trackers" in wf_dict["fws"][idx_fw]["spec"]:
wf_dict["fws"][idx_fw]["spec"]["_trackers"].extend(trackers)
else:
wf_dict["fws"][idx_fw]["spec"]["_trackers"] = trackers
return Workflow.from_dict(wf_dict)
def from_dict(cls, m_dict):
return Tracker(m_dict['filename'], m_dict['nlines'], m_dict.get('content', ''),
m_dict.get('allow_zipped', False))
def from_dict(cls, m_dict):
fworker = FWorker.from_dict(m_dict['fworker']) if m_dict['fworker'] else None
action = FWAction.from_dict(m_dict['action']) if m_dict.get('action') else None
trackers = [Tracker.from_dict(f) for f in m_dict['trackers']] if m_dict.get('trackers') else None
return Launch(m_dict['state'], m_dict['launch_dir'], fworker,
m_dict['host'], m_dict['ip'], trackers, action,
m_dict['state_history'], m_dict['launch_id'], m_dict['fw_id'])