Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_spec_force_convergence(spec, user_vasp_settings=None):
fw_spec = spec
update_set = {"ENCUT": 700, "EDIFF": 0.000001, "ALGO":"N", "NPAR":2}
if user_vasp_settings and user_vasp_settings.get("incar"):
update_set.update(user_vasp_settings["incar"])
fw_spec['vasp']['incar'].update(update_set)
old_struct=Poscar.from_dict(fw_spec["vasp"]["poscar"]).structure
if user_vasp_settings and user_vasp_settings.get("kpoints"):
kpoints_density = user_vasp_settings["kpoints"]["kpoints_density"]
else:
kpoints_density = 7000
k=Kpoints.automatic_density(old_struct, kpoints_density)
fw_spec['vasp']['kpoints'] = k.as_dict()
return fw_spec
class SetupFConvergenceTask(FireTaskBase, FWSerializable):
# TODO: This firetask isn't yet used in the EC workflow
_fw_name = "Setup Force Convergence Task"
def run_task(self, fw_spec):
incar = fw_spec['vasp']['incar']
update_set = {"ENCUT": 700, "EDIFF": 0.000001}
incar.update(update_set)
#if fw_spec['double_kmesh']:
kpoints = fw_spec['vasp']['kpoints']
k = [int(round(2.5*k)) if int(round(2.5*k))%2
else int(round(2.5*k))+1 for k in kpoints['kpoints'][0]]
kpoints['kpoints'] = [k]
return FWAction()
class SetupElastConstTask(FireTaskBase, FWSerializable):
class SetupUnconvergedHandlerTask(FireTaskBase, FWSerializable):
"""
Assumes the current directory contains an unconverged job. Fixes it and
runs it
"""
_fw_name = "Unconverged Handler Task"
def run_task(self, fw_spec):
ueh = UnconvergedErrorHandler()
custodian_out = ueh.correct()
return FWAction(stored_data={'error_list': custodian_out['errors']})
class SetupNonSCFTask(FireTaskBase, FWSerializable):
"""
Set up vasp inputs for non-SCF calculations (Uniform [DOS] or band
structure)
"""
_fw_name = "Setup non-SCF Task"
def __init__(self, parameters=None):
"""
:param parameters:
"""
parameters = parameters if parameters else {}
self.update(parameters)
self.line = parameters.get('mode', 'line').lower() == 'line'
self.kpoints_density = parameters.get('kpoints_density', 1000)
self.kpoints_line_density = parameters.get('kpoints_line_density', 20)
# not strictly needed here for pickle/unpickle, but complements __setstate__
def __getstate__(self):
return self.to_dict()
# added to support pickle/unpickle
def __setstate__(self, state):
self.__init__(state)
# added to support pickle/unpickle
def __reduce__(self):
t = defaultdict.__reduce__(self)
return (t[0], (self.to_dict(),), t[2], t[3], t[4])
class FWAction(FWSerializable):
"""
A FWAction encapsulates the output of a Firetask (it is returned by a Firetask after the
Firetask completes). The FWAction allows a user to store rudimentary output data as well
as return commands that alter the workflow.
"""
def __init__(self, stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None,
detours=None, defuse_children=False, defuse_workflow=False,
propagate=False):
"""
Args:
stored_data (dict): data to store from the run. Does not affect the operation of FireWorks.
exit (bool): if set to True, any remaining Firetasks within the same Firework are skipped.
update_spec (dict): specifies how to update the child FW's spec
mod_spec ([dict]): update the child FW's spec using the DictMod language (more flexible
than update_spec)
def update_spec_force_convergence(spec, user_vasp_settings=None):
fw_spec = spec
update_set = {"ENCUT": 700, "EDIFF": 0.000001, "ALGO":"N", "NPAR":2}
if user_vasp_settings and user_vasp_settings.get("incar"):
update_set.update(user_vasp_settings["incar"])
fw_spec['vasp']['incar'].update(update_set)
old_struct=Poscar.from_dict(fw_spec["vasp"]["poscar"]).structure
if user_vasp_settings and user_vasp_settings.get("kpoints"):
kpoints_density = user_vasp_settings["kpoints"]["kpoints_density"]
else:
kpoints_density = 7000
k=Kpoints.automatic_density(old_struct, kpoints_density)
fw_spec['vasp']['kpoints'] = k.as_dict()
return fw_spec
class SetupFConvergenceTask(FireTaskBase, FWSerializable):
# TODO: This firetask isn't yet used in the EC workflow
_fw_name = "Setup Force Convergence Task"
def run_task(self, fw_spec):
incar = fw_spec['vasp']['incar']
update_set = {"ENCUT": 700, "EDIFF": 0.000001}
incar.update(update_set)
#if fw_spec['double_kmesh']:
kpoints = fw_spec['vasp']['kpoints']
k = [int(round(2.5*k)) if int(round(2.5*k))%2
else int(round(2.5*k))+1 for k in kpoints['kpoints'][0]]
kpoints['kpoints'] = [k]
return FWAction()
class SetupElastConstTask(FireTaskBase, FWSerializable):
def run_task(self, fw_spec):
user_incar_settings= {"NPAR": 2}
if self.line:
MPNonSCFVaspInputSet.from_previous_vasp_run(os.getcwd(), mode="Line", copy_chgcar=False,
user_incar_settings=user_incar_settings, kpoints_line_density=self.kpoints_line_density)
kpath = HighSymmKpath(Poscar.from_file("POSCAR").structure)
return FWAction(stored_data={"kpath": kpath.kpath,
"kpath_name": kpath.name})
else:
MPNonSCFVaspInputSet.from_previous_vasp_run(os.getcwd(), mode="Uniform", copy_chgcar=False,
user_incar_settings=user_incar_settings, kpoints_density=self.kpoints_density)
return FWAction()
class SetupGGAUTask(FireTaskBase, FWSerializable):
"""
Assuming that GGA inputs/outputs already exist in the directory, set up a
GGA+U run.
"""
_fw_name = "Setup GGAU Task"
def run_task(self, fw_spec):
chgcar_start = False
# read the VaspInput from the previous run
poscar = Poscar.from_file(zpath('POSCAR'))
incar = Incar.from_file(zpath('INCAR'))
# figure out what GGA+U values to use and override them
# LDAU values to use
mpvis = MPVaspInputSet()
_fw_name = "Setup Static Task"
def run_task(self, fw_spec):
user_incar_settings={"NPAR":2}
MaterialsProjectStaticVaspInputSet.from_previous_vasp_run(fw_spec["prev_vasp_dir"], user_incar_settings=user_incar_settings)
structure = MaterialsProjectStaticVaspInputSet.get_structure(Vasprun("vasprun.xml"), Outcar("OUTCAR"),
initial_structure=False, refined_structure=True)
# redo POTCAR - this is necessary whenever you change a Structure
# because element order might change!! (learned the hard way...) -AJ
return FWAction(stored_data= {'refined_struct': structure[1].to_dict})
class SetupNonSCFTask(FireTaskBase, FWSerializable):
"""
Set up vasp inputs for non-SCF calculations (Uniform [DOS] or band structure)
"""
_fw_name = "Setup non-SCF Task"
def __init__(self, parameters=None):
"""
:param parameters:
"""
parameters = parameters if parameters else {}
self.update(parameters) # store the parameters explicitly set by the user
self.line = parameters.get('mode', 'line').lower() == 'line'
def run_task(self, fw_spec):
# coding: utf-8
from __future__ import unicode_literals
from fireworks.utilities.fw_serializers import FWSerializable, recursive_serialize, serialize_fw, \
recursive_deserialize
__author__ = 'Anubhav Jain'
__copyright__ = 'Copyright 2014, The Materials Project'
__version__ = '0.1'
__maintainer__ = 'Anubhav Jain'
__email__ = 'ajain@lbl.gov'
__date__ = 'Feb 10, 2014'
class BackgroundTask(FWSerializable, object):
_fw_name = 'BackgroundTask'
def __init__(self, tasks, num_launches=0, sleep_time=60, run_on_finish=False):
"""
:param tasks: [FireTask] - a list of FireTasks to perform
:param num_launches: (int) the total number of times to run the process (0=infinite)
:param sleep_time: (int) sleep time in seconds between background runs
:param run_on_finish (bool): always run this task upon completion of Firework
"""
self.tasks = tasks if isinstance(tasks, (list, tuple)) else [tasks]
self.num_launches = num_launches
self.sleep_time = sleep_time
self.run_on_finish = run_on_finish
@recursive_serialize
# TODO: This firetask isn't yet used in the EC workflow
_fw_name = "Setup Force Convergence Task"
def run_task(self, fw_spec):
incar = fw_spec['vasp']['incar']
update_set = {"ENCUT": 700, "EDIFF": 0.000001}
incar.update(update_set)
#if fw_spec['double_kmesh']:
kpoints = fw_spec['vasp']['kpoints']
k = [int(round(2.5*k)) if int(round(2.5*k))%2
else int(round(2.5*k))+1 for k in kpoints['kpoints'][0]]
kpoints['kpoints'] = [k]
return FWAction()
class SetupElastConstTask(FireTaskBase, FWSerializable):
_fw_name = "Setup Elastic Constant Task"
def run_task(self, fw_spec):
incar = Incar.from_file(zpath("INCAR"))
incar.update({"ISIF": 2})
incar.write_file("INCAR")
return FWAction()
class SetupDeformedStructTask(FireTaskBase, FWSerializable):
_fw_name = "Setup Deformed Struct Task"
def run_task(self, fw_spec):
# Read structure from previous relaxation
relaxed_struct = fw_spec['output']['crystal']
# Generate deformed structures
d_struct_set = DeformedStructureSet(relaxed_struct, ns=0.06)
'_priority': priority,
'_allow_fizzled_parents': True,
'_queueadapter': QA_DB,
'elastic_constant':"deformed_structure",
'clean_task_doc':True,
'deformation_matrix':d_struct_set.deformations[i].tolist(),
'original_task_id':fw_spec["task_id"]}
fws.append(Firework([VaspToDBTask(), AddElasticDataToDBTask()], spec,
name=get_slug(f + '--' + spec['task_type']),
fw_id=-998+i*10))
connections[-999+i*10] = [-998+i*10]
wf.append(Workflow(fws, connections))
return FWAction(additions=wf)
class AddElasticDataToDBTask(FireTaskBase, FWSerializable):
_fw_name = "Add Elastic Data to DB"
def run_task(self, fw_spec):
db_dir = os.environ['DB_LOC']
db_path = os.path.join(db_dir, 'tasks_db.json')
i = fw_spec['original_task_id']
with open(db_path) as f:
db_creds = json.load(f)
connection = MongoClient(db_creds['host'], db_creds['port'])
tdb = connection[db_creds['database']]
tdb.authenticate(db_creds['admin_user'], db_creds['admin_password'])
tasks = tdb[db_creds['collection']]
elasticity = tdb['elasticity']
ndocs = tasks.find({"original_task_id": i,
"state":"successful"}).count()
from collections import defaultdict
import tarfile
from fireworks.core.firework import FireWork
from fireworks.utilities.dict_mods import apply_mod
from fireworks.utilities.fw_serializers import FWSerializable
__author__ = 'Anubhav Jain'
__copyright__ = 'Copyright 2013, The Materials Project'
__version__ = '0.1'
__maintainer__ = 'Anubhav Jain'
__email__ = 'ajain@lbl.gov'
__date__ = 'Feb 27, 2013'
class Workflow(FWSerializable):
class Links(dict, FWSerializable):
@property
def nodes(self):
return self.keys()
@property
def parent_links(self):
# note: if performance of parent_links becomes an issue, override delitem/setitem to ensure it's always
# updated
d = defaultdict(list)
for (parent, children) in self.iteritems():
# add the parents
for child in children:
d[child].append(parent)
return dict(d)