Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_confs(self,
path_to_work,
path_to_equi,
refine=False):
path_to_work = os.path.abspath(path_to_work)
if os.path.exists(path_to_work):
dlog.warning('%s already exists' % path_to_work)
else:
os.makedirs(path_to_work)
path_to_equi = os.path.abspath(path_to_equi)
if 'start_confs_path' in self.parameter and os.path.exists(self.parameter['start_confs_path']):
path_to_equi = os.path.abspath(self.parameter['start_confs_path'])
cwd = os.getcwd()
task_list = []
if refine:
print('EOS refine starts')
task_list = make_refine(self.parameter['init_from_suffix'],
self.parameter['output_suffix'],
path_to_work)
os.chdir(cwd)
if self.reprod:
print('eos reproduce starts')
def make_confs(self,
path_to_work,
path_to_equi,
refine=False):
path_to_work = os.path.abspath(path_to_work)
if os.path.exists(path_to_work):
dlog.warning('%s already exists' % path_to_work)
else:
os.makedirs(path_to_work)
path_to_equi = os.path.abspath(path_to_equi)
if 'start_confs_path' in self.parameter and os.path.exists(self.parameter['start_confs_path']):
path_to_equi = os.path.abspath(self.parameter['start_confs_path'])
task_list = []
cwd = os.getcwd()
if self.reprod:
print('surface reproduce starts')
if 'vasp_lmp_path' not in self.parameter:
raise RuntimeError("please provide the vasp_lmp_path for reproduction")
vasp_lmp_path = os.path.abspath(self.parameter['vasp_lmp_path'])
task_list = reproduce.make_repro(vasp_lmp_path, path_to_work)
os.chdir(cwd)
nsw = 0
isif = 2
if not ('NSW' in incar and incar.get('NSW') == nsw):
dlog.info("%s setting NSW to %d" % (self.make_input_file.__name__, nsw))
incar['NSW'] = nsw
else:
raise RuntimeError("not supported calculation setting for VASP")
if not ('ISIF' in incar and incar.get('ISIF') == isif):
dlog.info("%s setting ISIF to %d" % (self.make_input_file.__name__, isif))
incar['ISIF'] = isif
elif cal_type == 'static':
nsw = 0
if not ('NSW' in incar and incar.get('NSW') == nsw):
dlog.info("%s setting ISIF to %d" % (self.make_input_file.__name__, nsw))
incar['NSW'] = nsw
else:
raise RuntimeError("not supported calculation type for VASP")
try:
kspacing = incar.get('KSPACING')
except KeyError:
raise RuntimeError("KSPACING must be given in INCAR")
if 'KGAMMA' in incar:
kgamma = incar.get('KGAMMA')
else:
kgamma = False
incar.write_file(os.path.join(output_dir, 'INCAR'))
else:
do_refine = False
suffix = '00'
# generate working directory like mp-xxx/eos_00 if jj['type'] == 'eos'
# handel the exception that the working directory exists
# ...
# determine the suffix: from scratch or refine
# ...
property_type = jj['type']
path_to_equi = os.path.join(ii, 'relaxation')
path_to_work = os.path.join(ii, property_type + '_' + suffix)
if os.path.exists(path_to_work):
dlog.warning('%s already exists' % path_to_work)
else:
os.makedirs(path_to_work)
prop = make_property_instance(jj)
task_list = prop.make_confs(path_to_work, path_to_equi, do_refine)
for kk in task_list:
poscar = os.path.join(kk, 'POSCAR')
inter = make_calculator(inter_param, poscar)
inter.make_potential_files(kk)
dlog.debug(prop.task_type()) ### debug
inter.make_input_file(kk, prop.task_type(), prop.task_param())
prop.post_process(task_list) # generate same KPOINTS file for elastic when doing VASP
for idx,rjob in enumerate(job_list) :
if not job_fin[idx] :
try:
status = rjob.check_status()
except:
ssh_sess = SSHSession(ssh_sess.remote_profile)
for _idx,_rjob in enumerate(job_list):
job_list[_idx] = SlurmJob(ssh_sess, work_path, _rjob.job_uuid)
count_fail = count_fail +1
dlog.info("ssh_sess failed for %d times"%count_fail)
break
if status == JobStatus.terminated :
lcount[idx]+=1
_job_uuid=rjob.remote_root.split('/')[-1]
dlog.info('Job at %s terminated, submit again'% _job_uuid)
dlog.debug('try %s times for %s'% (lcount[idx], _job_uuid))
rjob.submit(task_chunks[idx], command, resources = resources,restart=True)
if lcount[idx]>3:
dlog.info('Too many errors for ! %s ' % _job_uuid)
rjob.download(task_chunks[idx], backward_task_files,back_error=True)
rjob.clean()
job_fin[idx] = True
elif status == JobStatus.finished :
rjob.download(task_chunks[idx], backward_task_files)
rjob.clean()
job_fin[idx] = True
time.sleep(10)
dlog.debug('error count')
dlog.debug(lcount)
# delete path map file when job finish
_pmap.delete()
fp_iters=os.path.join(path,ITERS_PAT)
dlog.debug(fp_iters)
f_fp_iters=glob(fp_iters)
dlog.info("len iterations data: %s"%len(f_fp_iters))
fp_init=os.path.join(path,INIT_PAT)
dlog.debug(fp_init)
f_fp_init=glob(fp_init)
if skip_init:
entries = _parsing_vasp(f_fp_iters,config_info_dict, id_prefix)
dlog.info("len collected data: %s"%len(entries))
else:
dlog.info("len initialization data: %s"%len(f_fp_init))
entries=_parsing_vasp(f_fp_init,config_info_dict, id_prefix,iters=False)
entries.extend(_parsing_vasp(f_fp_iters,config_info_dict, id_prefix))
dlog.info("len collected data: %s"%len(entries))
#print(output)
#print(entries)
dumpfn(entries,output,indent=4)
def job_id(self, values):
response, jobQueue = values
self._job_id = response['jobId']
self._job_name = response['jobName']
self.__class__._jobQueue = jobQueue
self.__class__._job_id_map_status[self._job_id] = self.map_aws_status_to_dpgen_status(response.get('status', 'SUBMITTED'))
self.context.write_file(self.job_id_name, self._job_id)
dlog.debug("15000, _job_id:%s, _job_name:%s, _map:%s, _Queue:%s" % (self._job_id, self._job_name, self.__class__._job_id_map_status, self.__class__._jobQueue))
def __init__ (self,
ssh_session,
local_root,
job_uuid=None,
) :
self.local_root = os.path.abspath(local_root)
if job_uuid:
self.job_uuid=job_uuid
else:
self.job_uuid = str(uuid.uuid4())
self.remote_root = os.path.join(ssh_session.get_session_root(), self.job_uuid)
dlog.info("local_root is %s"% local_root)
dlog.info("remote_root is %s"% self.remote_root)
self.ssh = ssh_session.get_ssh_client()
# keep ssh alive
transport = self.ssh.get_transport()
transport.set_keepalive(60)
try:
sftp = self.ssh.open_sftp()
sftp.mkdir(self.remote_root)
sftp.close()
except:
pass
# open('job_uuid', 'w').write(self.job_uuid)
dlog.info("DP-GEN will use settings in md_incar!")
jdata['md_nstep'] = nsw_steps
except:
pass
## correct element name
temp_elements = []
for ele in jdata['elements']:
temp_elements.append(ele[0].upper() + ele[1:])
jdata['elements'] = temp_elements
dlog.info("Elements are %s"% ' '.join(jdata['elements']))
## Iteration
stage_list = [int(i) for i in jdata['stages']]
for stage in stage_list:
if stage == 1 :
dlog.info("Current stage is 1, relax")
create_path(out_dir)
shutil.copy2(args.PARAM, os.path.join(out_dir, 'param.json'))
if from_poscar :
make_super_cell_poscar(jdata)
else :
make_unit_cell(jdata)
make_super_cell(jdata)
place_element(jdata)
if args.MACHINE is not None:
make_vasp_relax(jdata, mdata)
run_vasp_relax(jdata, mdata, disp)
else:
make_vasp_relax(jdata, {"fp_resources":{}})
elif stage == 2 :
dlog.info("Current stage is 2, perturb and scale")
make_scale(jdata)
forward_task_files,
backward_task_files,
forward_task_deference = True,
mark_failure = False,
outlog = 'log',
errlog = 'err'):
ratio_failure = self.mdata_resources.get("ratio_failue", 0)
while True:
if self.check_all_dispatchers_finished(ratio_failure):
self.clean()
break
self.exception_handling(ratio_failure)
for ii in range(self.nchunks):
dispatcher_status = self.check_dispatcher_status(ii)
if dispatcher_status == "unsubmitted":
dlog.info(self.dispatcher_list[ii]["entity"].ip)
self.dispatcher_list[ii]["entity"].job_handler = self.dispatcher_list[ii]["dispatcher"].submit_jobs(resources,
command,
work_path,
self.task_chunks[ii],
group_size,
forward_common_files,
forward_task_files,
backward_task_files,
forward_task_deference,
outlog,
errlog)
self.dispatcher_list[ii]["entity"].job_record = self.dispatcher_list[ii]["entity"].job_handler["job_record"]
self.dispatcher_list[ii]["dispatcher_status"] = "running"
elif dispatcher_status == "finished" and self.dispatcher_list[ii]["entity"]:
# no jobs in queue, delete current machine
# else add current machine to server_pool