Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# handle the "parent structure" option, which is used to intentionally force slightly
# different structures to contribute to the same "material", e.g. from an ordering scheme
if "parent_structure" in taskdoc:
t_struct = Structure.from_dict(taskdoc["parent_structure"]["structure"])
q = {"formula_reduced_abc": formula, "parent_structure.spacegroup.number": taskdoc[
"parent_structure"]["spacegroup"]["number"]}
else:
sgnum = taskdoc["output"]["spacegroup"]["number"]
t_struct = Structure.from_dict(taskdoc["output"]["structure"])
q = {"formula_reduced_abc": formula, "sg_number": sgnum}
for m in self._materials.find(q, {"parent_structure": 1, "structure": 1, "material_id": 1}):
s_dict = m["parent_structure"]["structure"] if "parent_structure" in m else m[
"structure"]
m_struct = Structure.from_dict(s_dict)
sm = StructureMatcher(ltol=ltol, stol=stol, angle_tol=angle_tol,
primitive_cell=True, scale=True,
attempt_supercell=False, allow_subset=False,
comparator=ElementComparator())
if sm.fit(m_struct, t_struct):
return m["material_id"]
return None
# for backwards compatibility with older version of mpmorph
fs_id.append(run["ionic_steps_fs_id"])
fs.append('previous_runs_gfs')
elif "input" in run.keys():
fs_id.append(run["calcs_reversed"][0]["output"]["ionic_steps_fs_id"])
fs.append('structures_fs')
for i, v in enumerate(fs_id):
mmdb = VaspMDCalcDb.from_db_file(db_file, admin=True)
ionic_steps_dict = load_ionic_steps(v, mmdb.db, fs[i])
if i == 0:
trajectory = Trajectory.from_structures([Structure.from_dict(i['structure']) for i in ionic_steps_dict])
else:
trajectory.extend(
Trajectory.from_structures([Structure.from_dict(i['structure']) for i in ionic_steps_dict]))
return trajectory
tag = self["tag"]
db_file = env_chk(self.get("db_file"), fw_spec)
t_step = self.get("t_step", 10)
t_min = self.get("t_min", 0)
t_max = self.get("t_max", 1000)
mesh = self.get("mesh", [20, 20, 20])
eos = self.get("eos", "vinet")
qha_type = self.get("qha_type", "debye_model")
pressure = self.get("pressure", 0.0)
poisson = self.get("poisson", 0.25)
gibbs_summary_dict = {}
mmdb = MMVaspDb.from_db_file(db_file, admin=True)
# get the optimized structure
d = mmdb.collection.find_one({"task_label": "{} structure optimization".format(tag)})
structure = Structure.from_dict(d["calcs_reversed"][-1]["output"]['structure'])
gibbs_summary_dict["structure"] = structure.as_dict()
# get the data(energy, volume, force constant) from the deformation runs
docs = mmdb.collection.find({"task_label": {"$regex": "{} gibbs*".format(tag)},
"formula_pretty": structure.composition.reduced_formula})
energies = []
volumes = []
force_constants = []
for d in docs:
s = Structure.from_dict(d["calcs_reversed"][-1]["output"]['structure'])
energies.append(d["calcs_reversed"][-1]["output"]['energy'])
if qha_type not in ["debye_model"]:
force_constants.append(d["calcs_reversed"][-1]["output"]['force_constants'])
volumes.append(s.volume)
gibbs_summary_dict["energies"] = energies
gibbs_summary_dict["volumes"] = volumes
Args:
docs ([dict]): List of docs. Each doc should have the same
format as one returned from .dft.parse_dir.
Returns:
A list of structures, and a DataFrame with energy and force
data in 'y_orig' column, data type ('energy' or 'force') in
'dtype' column, No. of atoms in 'n' column sharing the same row
of energy data while 'n' being 1 for the rows of force data.
"""
structures, y_orig, n, dtype = [], [], [], []
for d in docs:
if isinstance(d['structure'], dict):
structure = Structure.from_dict(d['structure'])
else:
structure = d['structure']
outputs = d['outputs']
force_arr = np.array(outputs['forces'])
assert force_arr.shape == (len(structure), 3), \
'Wrong force array not matching structure'
force_arr = force_arr.ravel()
y = np.concatenate(([outputs['energy']], force_arr))
y_orig.append(y)
n.append(np.insert(np.ones(len(y) - 1), 0, d['num_atoms']))
dtype.extend(['energy'] + ['force'] * len(force_arr))
structures.append(structure)
df = pd.DataFrame(dict(y_orig=np.concatenate(y_orig), n=np.concatenate(n),
dtype=dtype))
for k, v in kwargs.items():
df[k] = v
(approximately 70k).
Returns:
formula (input)
ehull (output)
References:
If you need a version WITH structures, use generate_data.py to create
mp_all.csv, and use filename="sources/mp_all.csv"
"""
df = pd.read_csv(os.path.join(data_dir, filename))
df = df.drop("mpid", axis=1)
if 'structure' in df.columns.values:
df['structure'] = df['structure'].map(ast.literal_eval).map(Structure.from_dict)
colmap = {'material_id': 'mpid',
'pretty_formula': 'formula',
'band_gap': 'gap pbe',
'e_above_hull': 'ehull',
'elasticity.K_VRH': 'bulkmod',
'elasticity.G_VRH': 'shearmod',
'elasticity.elastic_anisotropy': 'elastic_anisotropy',
'total_magnetization': 'mu_B'}
return df.rename(columns=colmap)
def run_task(self, fw_spec):
ref_struct = self['structure']
d = {
"analysis": {},
"initial_structure": self['structure'].as_dict()
}
# Get optimized structure
calc_locs_opt = [cl for cl in fw_spec.get('calc_locs', []) if 'optimiz' in cl['name']]
if calc_locs_opt:
optimize_loc = calc_locs_opt[-1]['path']
logger.info("Parsing initial optimization directory: {}".format(optimize_loc))
drone = VaspDrone()
optimize_doc = drone.assimilate(optimize_loc)
opt_struct = Structure.from_dict(optimize_doc["calcs_reversed"][0]["output"]["structure"])
d.update({"optimized_structure": opt_struct.as_dict()})
ref_struct = opt_struct
eq_stress = -0.1*Stress(optimize_doc["calcs_reversed"][0]["output"]["ionic_steps"][-1]["stress"])
else:
eq_stress = None
if self.get("fw_spec_field"):
d.update({self.get("fw_spec_field"): fw_spec.get(self.get("fw_spec_field"))})
# Get the stresses, strains, deformations from deformation tasks
defo_dicts = fw_spec["deformation_tasks"].values()
stresses, strains, deformations = [], [], []
for defo_dict in defo_dicts:
stresses.append(Stress(defo_dict["stress"]))
strains.append(Strain(defo_dict["strain"]))
deformations.append(Deformation(defo_dict["deformation_matrix"]))
def write_cfgs(self, filename, cfg_pool):
lines = []
for dataset in cfg_pool:
if isinstance(dataset['structure'], dict):
structure = Structure.from_dict(dataset['structure'])
else:
structure = dataset['structure']
energy = dataset['outputs']['energy']
forces = dataset['outputs']['forces']
virial_stress = dataset['outputs']['virial_stress']
lines.append(self._line_up(structure, energy, forces, virial_stress))
# dist = np.unique(structure.distance_matrix.ravel())[1]
# if self.shortest_distance > dist:
# self.shortest_distance = dist
self.specie = Element(structure.symbol_set[0])
with open(filename, 'w') as f:
f.write('\n'.join(lines))
def add_structure(self, source, name=None, identifier=None, fmt=None):
"""add a structure to the mpfile"""
from pymatgen import Structure, MPRester
if isinstance(source, Structure):
structure = source
elif isinstance(source, dict):
structure = Structure.from_dict(source)
elif os.path.exists(source):
structure = Structure.from_file(source, sort=True)
elif isinstance(source, six.string_types):
if fmt is None:
raise ValueError("Need fmt to get structure from string!")
structure = Structure.from_str(source, fmt, sort=True)
else:
raise ValueError(source, "not supported!")
if name is not None:
if not isinstance(name, six.string_types):
raise ValueError("structure name needs to be a string")
elif "." in name:
raise ValueError("structure name cannot contain dots (.)")
mpr = MPRester()
def process_traj(data):
i, fs_id, fs, db_file = data[0], data[1], data[2], data[3]
mmdb = VaspMDCalcDb.from_db_file(db_file, admin=True)
ionic_steps_dict = load_ionic_steps(fs_id, mmdb.db, fs)
structure = Structure.from_dict(ionic_steps_dict[0]['structure'])
positions = [0] * len(ionic_steps_dict)
for i, step in enumerate(ionic_steps_dict):
_step = [atom['abc'] for atom in step["structure"]["sites"]]
positions[i] = _step
traj = Trajectory(structure.lattice.matrix, structure.species, positions, 0.002)
return i, traj.as_dict()
gfs_id, compression_type = self.insert_gridfs(ionic_steps_json, "structures_fs")
task_doc["calcs_reversed"][0]['output']['ionic_steps_compression'] = compression_type
task_doc["calcs_reversed"][0]['output']['ionic_steps_fs_id'] = gfs_id
# Aggregate a trajectory
## Convert from a list of dictionaries to a dictionary of lists
ionic_steps_dict = task_doc["calcs_reversed"][0]['output']['ionic_steps']
del task_doc["calcs_reversed"][0]['output']['ionic_steps']
ionic_steps_defaultdict = defaultdict(list)
for d in ionic_steps_dict:
for key, val in d.items():
ionic_steps_defaultdict[key].append(val)
ionic_steps = dict(ionic_steps_defaultdict.items())
## extract structures from dictionary
structures = [Structure.from_dict(struct) for struct in ionic_steps['structure']]
del ionic_steps['structure']
frame_properties = {}
for key in ['e_fr_energy', 'e_wo_entrp', 'e_0_energy', 'kinetic', 'lattice kinetic', 'nosepot',
'nosekinetic', 'total']:
frame_properties[key] = ionic_steps[key]
# Create trajectory
trajectory = Trajectory.from_structures(structures, constant_lattice=True,
frame_properties=frame_properties,
time_step=task_doc['input']['incar']['POTIM'])
traj_dict = json.dumps(trajectory.as_dict(), cls=MontyEncoder)
gfs_id, compression_type = self.insert_gridfs(traj_dict, "trajectories_fs")
task_doc['trajectory'] = {
'formula': trajectory[0].composition.formula.replace(' ', ''),