Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
md_task = []
f_idx = []
with open(ii) as fp:
for ii in fp :
md_task.append(ii.split()[0])
f_idx.append(ii.split()[1])
md_task = md_task[:fp_task_max]
f_idx = f_idx[:fp_task_max]
cc = 0
for tt,ff in zip(md_task, f_idx) :
traj_file = os.path.join(tt, 'traj', '%d.lammpstrj' % int(ff))
poscar_file = os.path.join(fp_path,
'task.%03d.%06d' % (int(sidx), cc),
'POSCAR')
cc += 1
sys0 = dpdata.System(traj_file, fmt = 'lammps/dump', type_map = type_map)
sys1 = dpdata.System(poscar_file, fmt = 'vasp/poscar')
test_atom_names(testCase, sys0, sys1)
with open(log_lammps, 'r') as fp:
if 'Total wall time:' not in fp.read():
warnings.warn("lammps not finished " + log_lammps + " skip")
return None
else:
fp.seek(0)
lines = fp.read().split('\n')
for ii in lines:
if ("Total number of atoms" in ii) and (not 'print' in ii):
natoms = float(ii.split('=')[1].split()[0])
if ("Final energy per atoms" in ii) and (not 'print' in ii):
epa = float(ii.split('=')[1].split()[0])
dump = os.path.join(output_dir, 'dump.relax')
contcar = os.path.join(output_dir, 'CONTCAR')
d_dump = dpdata.System(dump, fmt='lammps/dump')
d_dump.to('vasp/poscar', contcar, frame_idx=-1)
force = d_dump['forces']
result_dict = {"energy": natoms * epa, "force": list(force[-1].reshape(natoms * 3))}
return result_dict
with open(log_lammps, 'r') as fp:
if 'Total wall time:' not in fp.read():
warnings.warn("lammps not finished " + log_lammps + " skip")
return None
else:
fp.seek(0)
lines = fp.read().split('\n')
for ii in lines:
if ("Total number of atoms" in ii) and (not 'print' in ii):
natoms = float(ii.split('=')[1].split()[0])
if ("Final energy per atoms" in ii) and (not 'print' in ii):
epa = float(ii.split('=')[1].split()[0])
dump = os.path.join(output_dir, 'dump.relax')
contcar = os.path.join(output_dir, 'CONTCAR')
d_dump = dpdata.System(dump, fmt='lammps/dump')
d_dump.to('vasp/poscar', contcar, frame_idx=-1)
force = d_dump['forces']
result_dict = {"energy": natoms * epa, "force": list(force[-1].reshape(natoms * 3))}
return result_dict
f_idx = []
with open(ii) as fp:
for ii in fp :
md_task.append(ii.split()[0])
f_idx.append(ii.split()[1])
md_task = md_task[:fp_task_max]
f_idx = f_idx[:fp_task_max]
cc = 0
for tt,ff in zip(md_task, f_idx) :
traj_file = os.path.join(tt, 'traj', '%d.lammpstrj' % int(ff))
poscar_file = os.path.join(fp_path,
'task.%03d.%06d' % (int(sidx), cc),
'POSCAR')
cc += 1
sys0 = dpdata.System(traj_file, fmt = 'lammps/dump', type_map = type_map)
sys1 = dpdata.System(poscar_file, fmt = 'vasp/poscar')
test_atom_names(testCase, sys0, sys1)
md_task = []
f_idx = []
with open(ii) as fp:
for ii in fp :
md_task.append(ii.split()[0])
f_idx.append(ii.split()[1])
md_task = md_task[:fp_task_max]
f_idx = f_idx[:fp_task_max]
cc = 0
for tt,ff in zip(md_task, f_idx) :
traj_file = os.path.join(tt, 'traj', '%d.lammpstrj' % int(ff))
poscar_file = os.path.join(fp_path,
'task.%03d.%06d' % (int(sidx), cc),
'POSCAR')
cc += 1
sys0 = dpdata.System(traj_file, fmt = 'lammps/dump', type_map = type_map)
sys1 = dpdata.System(poscar_file, fmt = 'vasp/poscar')
test_atom_names(testCase, sys0, sys1)
def _make_fake_md(idx, md_descript, atom_types, type_map, ele_temp = None) :
"""
md_descript: list of dimension
[n_sys][n_MD][n_frame]
ele_temp: list of dimension
[n_sys][n_MD]
"""
natoms = len(atom_types)
ntypes = len(type_map)
atom_types = np.array(atom_types, dtype = int)
atom_numbs = [np.sum(atom_types == ii) for ii in range(ntypes)]
sys = dpdata.System()
sys.data['atom_names'] = type_map
sys.data['atom_numbs'] = atom_numbs
sys.data['atom_types'] = atom_types
for sidx,ss in enumerate(md_descript) :
for midx,mm in enumerate(ss) :
nframes = len(mm)
cells = np.random.random([nframes,3,3])
coords = np.random.random([nframes,natoms,3])
sys.data['coords'] = coords
sys.data['cells'] = cells
task_dir = os.path.join('iter.%06d' % idx,
'01.model_devi',
'task.%03d.%06d' % (sidx, midx))
os.makedirs(os.path.join(task_dir, 'traj'), exist_ok = True)
for ii in range(nframes) :
_write_lammps_dump(sys,
work_path = os.path.join(iter_name, fp_name)
fp_tasks = glob.glob(os.path.join(work_path, 'task.*'))
fp_tasks.sort()
if len(fp_tasks) == 0 :
return
system_idx_str = [os.path.basename(ii).split('.')[1] for ii in fp_tasks]
system_idx_str = list(set(system_idx_str))
system_idx_str.sort()
for ii in system_idx_str:
potcars = []
sys_tasks = glob.glob(os.path.join(work_path, 'task.%s.*' % ii))
assert (len(sys_tasks) != 0)
sys_poscar = os.path.join(sys_tasks[0], 'POSCAR')
sys = dpdata.System(sys_poscar, fmt = 'vasp/poscar')
for ele_name in sys['atom_names']:
ele_idx = jdata['type_map'].index(ele_name)
potcars.append(fp_pp_files[ele_idx])
with open(os.path.join(work_path,'POTCAR.%s' % ii), 'w') as fp_pot:
for jj in potcars:
with open(os.path.join(fp_pp_path, jj)) as fp:
fp_pot.write(fp.read())
sys_tasks = glob.glob(os.path.join(work_path, 'task.%s.*' % ii))
cwd = os.getcwd()
for jj in sys_tasks:
os.chdir(jj)
os.symlink(os.path.join('..', 'POTCAR.%s' % ii), 'POTCAR')
os.chdir(cwd)
def make_pwscf(tdir, fp_params, mass_map, fp_pp_path, fp_pp_files, user_input) :
cwd = os.getcwd()
os.chdir(tdir)
sys_data = dpdata.System('POSCAR').data
sys_data['atom_masses'] = mass_map
ret = make_pwscf_input(sys_data, fp_pp_files, fp_params)
open('input', 'w').write(ret)
os.chdir(cwd)
def run_model_devi(iter_index, jdata, mdata, dispatcher):
"""submit dp test tasks"""
iter_name = make_iter_name(iter_index)
work_path = os.path.join(iter_name, model_devi_name)
# generate command
commands = []
tasks = glob.glob(os.path.join(work_path, "task.*"))
run_tasks = [os.path.basename(ii) for ii in tasks]
# get models
models = glob.glob(os.path.join(work_path, "graph*pb"))
model_names = [os.path.basename(ii) for ii in models]
task_model_list = []
for ii in model_names:
task_model_list.append(os.path.join('..', ii))
# get max data size
data_size = max([len(dpdata.System(os.path.join(
task, rest_data_name), fmt="deepmd/npy")) for task in tasks])
# models
commands = []
detail_file_names = []
for ii, mm in enumerate(task_model_list):
detail_file_name = "{prefix}.{ii}".format(
prefix=detail_file_name_prefix,
ii=ii,
)
# TODO: support 0.x?
command = "{python} -m deepmd test -m {model} -s {system} -n {numb_test} -d {detail_file}".format(
python=mdata['python_test_path'],
model=mm,
system=rest_data_name,
numb_test=data_size,
detail_file=detail_file_name,