Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from automatminer.featurization import AutoFeaturizer
from automatminer.preprocessing import DataCleaner, FeatureReducer
from automatminer.automl.adaptors import TPOTAdaptor, SinglePipelineAdaptor
from automatminer.pipeline import MatPipe
from automatminer.utils.ml import AMM_REG_NAME, AMM_CLF_NAME
from automatminer_dev.config import LP
"""
Tasks for dev benchmarking.
"""
@explicit_serialize
class RunPipe(FireTaskBase):
_fw_name = "RunPipe"
def run_task(self, fw_spec):
# Read data from fw_spec
pipe_config_dict = fw_spec["pipe_config"]
fold = fw_spec["fold"]
kfold_config = fw_spec["kfold_config"]
target = fw_spec["target"]
data_file = fw_spec["data_file"]
clf_pos_label = fw_spec["clf_pos_label"]
problem_type = fw_spec["problem_type"]
learner_name = pipe_config_dict["learner_name"]
cache = fw_spec["cache"]
learner_kwargs = pipe_config_dict["learner_kwargs"]
reducer_kwargs = pipe_config_dict["reducer_kwargs"]
cleaner_kwargs = pipe_config_dict["cleaner_kwargs"]
from fireworks import Firework, LaunchPad, FWAction, FireTaskBase
from fireworks.core.rocket_launcher import launch_rocket
from fireworks.utilities.fw_utilities import explicit_serialize
from old.reference import ref_dict
from turboworks.utils import random_guess
from old.assorted.optimize_task import OptimizeTask
@explicit_serialize
class CalculateTask(FireTaskBase):
_fw_name = "CalculateTask"
def run_task(self, fw_spec):
A = fw_spec['Structure']['A']
B = fw_spec['Structure']['B']
C = fw_spec['Structure']['C']
D_output = {'energy': {'good_estimate': A * B / C}}
# D_output = {'output': {'D':A*C}}
# Modify changes in spec
return FWAction(update_spec=D_output)
@explicit_serialize
class SkoptimizeTask(OptimizeTask):
_fw_name = "SkoptimizeTask"
from fireworks.core.rocket_launcher import rapidfire
from fireworks.utilities.fw_utilities import explicit_serialize
from rocketsled import MissionControl, OptTask
# Setting up the FireWorks LaunchPad
launchpad = LaunchPad(name="rsled")
opt_label = "opt_default"
db_info = {"launchpad": launchpad, "opt_label": opt_label}
# We constrain our dimensions to 3 integers, each between 1 and 5
x_dim = [(1, 5), (1, 5), (1, 5)]
@explicit_serialize
class ObjectiveFuncTask(FireTaskBase):
"""
An example task which just evaluates the following simple function:
f(x) = x[0] * x[1] / x[2]
Replace this code with your objective function if your objective function
is relatively simple (i.e., only needs one Firework).
"""
_fw_name = "ObjectiveFuncTask"
def run_task(self, fw_spec):
x = fw_spec["_x"]
y = x[0] * x[1] / x[2]
return FWAction(update_spec={"_y": y})
structure_dict = fw_spec["structure"]
if self.get("rescale_volume", False):
spec_structure = Structure.from_dict(structure_dict)
scaling_volume = spec_structure.volume * self["rescale_volume"]
spec_structure.scale_lattice(scaling_volume)
structure_dict = spec_structure.as_dict()
_poscar = Poscar(structure_dict)
_poscar.write_file("POSCAR")
return FWAction()
@explicit_serialize
class SaveStructureTask(FireTaskBase):
required_params = []
optional_params = []
def run_task(self, fw_spec):
osw = list(os.walk("."))[0]
files = []
for file_name in osw[2]:
if "CONTCAR" in file_name:
files.append(file_name)
_poscar = Poscar.from_file(filename=files[-1], check_for_POTCAR=True, read_velocities=True)
_structure = _poscar.structure.as_dict()
return FWAction(update_spec={"structure": _structure})
import numpy as np
from pymatgen import Composition
from pymatgen.electronic_structure.bandstructure import BandStructure
from pymatgen.electronic_structure.boltztrap import BoltztrapRunner, BoltztrapAnalyzer
from pymatgen.entries.compatibility import MaterialsProjectCompatibility
from pymatgen.entries.computed_entries import ComputedEntry
__author__ = 'Geoffroy Hautier, Anubhav Jain'
__copyright__ = 'Copyright 2014, The Materials Project'
__version__ = '0.1'
__maintainer__ = 'Anubhav Jain'
__email__ = 'ajain@lbl.gov'
__date__ = 'Feb 24, 2014'
class BoltztrapRunTask(FireTaskBase, FWSerializable):
_fw_name = "Boltztrap Run Task"
TAU = 1E-14
KAPPAL = 1
def generate_te_doping(self, d):
types = ['p', 'n']
target = 'seebeck_doping' # root key for getting all temps, etc
pf_dict = defaultdict(lambda: defaultdict(int))
zt_dict = defaultdict(lambda: defaultdict(int))
for type in types:
for t in d[target][type]: # temperatures
outside_pf_array = []
outside_zt_array = []
for didx, tensor in enumerate(d[target][type][t]): # doping idx
corr_vol = RescaleVolume.of_poscar(poscar_path="./POSCAR", initial_temperature=initial_temperature,
initial_pressure=initial_pressure,
target_pressure=target_pressure,
target_temperature=target_temperature, alpha=alpha, beta=beta)
# Rescale volume based on temperature difference first. Const T will return no volume change:
corr_vol.by_thermo(scale='temperature')
# TO DB ("Rescaled volume due to delta T: ", corr_vol.structure.volume)
# Rescale volume based on pressure difference:
corr_vol.by_thermo(scale='pressure')
# TO DB ("Rescaled volume due to delta P: ", corr_vol.structure.volume)
corr_vol.poscar.write_file("./POSCAR")
# Pass the rescaled volume to Poscar
return FWAction(stored_data=corr_vol.structure.as_dict())
@explicit_serialize
class CopyCalsHome(FireTaskBase):
required_params = ["calc_home", "run_name"]
optional_params = ["files"]
def run_task(self, fw_spec):
default_list = ["INCAR", "POSCAR", "CONTCAR", "OUTCAR", "POTCAR", "vasprun.xml", "XDATCAR", "OSZICAR", "DOSCAR"]
files = self.get("files", default_list)
calc_home = self["calc_home"]
run_name = self["run_name"]
target_dir = os.path.join(calc_home, run_name)
if not os.path.exists(calc_home):
os.mkdir(calc_home)
if not os.path.exists(target_dir):
os.mkdir(target_dir)
for f in files:
try:
shutil.copy2(f, target_dir)
from atomate.common.firetasks.glue_tasks import PassCalcLocs
import shutil
import numpy as np
from pymatgen.io.vasp.sets import MITMDSet
import os
__author__ = 'Muratahan Aykol '
# TODO: 2. Add option to lead to a production run of specified length after density is found
# TODO: 3. Switch to MPRelax Parameters in MD
# TODO: 4. Database insertion?
# TODO: 5. Parser tasks
@explicit_serialize
class AmorphousMakerTask(FireTaskBase):
"""
Create a constrained-random packed structure from composition and box dimensions.
Required params:
composition: (dict) a dict of target composition with integer atom numbers
e.g. {"V":22, "Li":10, "O":75, "B":10}
box_scale: (float) all lattice vectors are multiplied with this scalar value.
e.g. edge length of a cubic simulation box.
Optional params:
tol (float): tolerance factor for how close the atoms can get (angstroms).
e.g. tol = 2.0 angstroms
packmol_path (str): path to the packmol executable. Defaults to "packmol"
clean (bool): whether the intermedite files generated are deleted. Defaults to True.
"""
required_params = ["composition", "box_scale"]
optional_params = ["packmol_path", "clean", "tol"]
fws.append(Firework(tasks=[t], name="run0", parents=[], spec=priority_spec))
t = SpawnTask(spawn_count=0, spawn_limit=3, priority_spec=priority_spec)
fws.append(Firework(tasks=[t], name="spawn_0", parents=fws[len(fws)-1], spec=priority_spec))
wf = Workflow(fireworks=fws, name="Empty_WF_01")
return wf
@explicit_serialize
class EmptyTask(FireTaskBase):
def run_task(self, fw_spec):
return FWAction()
@explicit_serialize
class SpawnTask(FireTaskBase):
required_params = ["spawn_count", "spawn_limit", "priority_spec"]
optional_params = ["diffusion"]
def run_task(self, fw_spec):
spawn_count = self["spawn_count"]
spawn_limit = self["spawn_limit"]
priority_spec = self["priority_spec"]
diffusion = self.get("diffusion", False)
fws = []
if spawn_count < spawn_limit:
fws.append(Firework(SpawnTask(spawn_count=spawn_count+1, spawn_limit=spawn_limit, priority_spec=priority_spec, diffusion=diffusion),
name="spawn_"+ str(spawn_count+1), spec=priority_spec))
else:
if diffusion:
fws.append(Firework(EmptyTask(), name="longrun_0", spec=priority_spec))
fws.append(Firework(EmptyTask(), name="longrun_1", spec=priority_spec, parents=fws[len(fws)-1]))
fws.append(Firework(EmptyTask(), name="longrun_2", spec=priority_spec, parents=fws[len(fws)-1]))
from automatminer.preprocessing import DataCleaner, FeatureReducer
from automatminer.automl.adaptors import TPOTAdaptor, SinglePipelineAdaptor
from automatminer.pipeline import MatPipe
from automatminer.utils.ml import AMM_REG_NAME, AMM_CLF_NAME
from automatminer.utils.log import initialize_logger, AMM_LOGGER_BASENAME
from automatminer_dev.config import LP
"""
Tasks for dev benchmarking.
"""
@explicit_serialize
class RunPipe(FireTaskBase):
_fw_name = 'RunPipe'
def run_task(self, fw_spec):
# Read data from fw_spec
pipe_config_dict = fw_spec["pipe_config"]
fold = fw_spec["fold"]
kfold_config = fw_spec["kfold_config"]
target = fw_spec["target"]
data_file = fw_spec["data_file"]
clf_pos_label = fw_spec["clf_pos_label"]
problem_type = fw_spec["problem_type"]
learner_name = pipe_config_dict["learner_name"]
cache = fw_spec["cache"]
learner_kwargs = pipe_config_dict["learner_kwargs"]
reducer_kwargs = pipe_config_dict["reducer_kwargs"]
cleaner_kwargs = pipe_config_dict["cleaner_kwargs"]
autofeaturizer_kwargs = pipe_config_dict["autofeaturizer_kwargs"]
calc_home=calc_home, snap=snap_num, priority_spec=priority_spec)
if snap_num == 0:
name = str(structure.composition.reduced_formula)
temps = self.get("temps", [500, 1000, 1500])
for temp in temps:
_wf = get_wf_density(structure=structure, temperature=temp, pressure_threshold=5,
name=name + "_snap_" + str(snap_num) + '_diffusion_' + str(temp), db_file=db_file,
copy_calcs=copy_calcs, calc_home=calc_home, cool=False, diffusion=True,
priority_spec=priority_spec)
wfs.append(_wf)
return FWAction(additions=wfs)
@explicit_serialize
class DiffusionTask(FireTaskBase):
required_params = ["copy_calcs", "calc_home", "snap_num"]
optional_params = ["temps", "name", "db_file", "priority_spec"]
def run_task(self, fw_spec):
copy_calcs = self["copy_calcs"]
calc_home = self["calc_home"]
snap_num = self["snap_num"]
db_file = self.get("db_file", None)
priority_spec = self.get("priority_spec", {})
if os.path.exists(os.path.join(os.getcwd(),'XDATCAR.gz')):
xdat = Xdatcar(os.path.join(os.getcwd(),'XDATCAR.gz'))
else:
xdat = Xdatcar(os.path.join(os.getcwd(), 'XDATCAR'))
structure = xdat.structures[len(xdat.structures)-1]