Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def all_finished(self,
job_handler):
task_chunks = job_handler['task_chunks']
task_chunks_str = ['+'.join(ii) for ii in task_chunks]
task_hashes = [sha1(ii.encode('utf-8')).hexdigest() for ii in task_chunks_str]
job_list = job_handler['job_list']
job_record = job_handler['job_record']
command = job_handler['command']
resources = job_handler['resources']
outlog = job_handler['outlog']
errlog = job_handler['errlog']
backward_task_files = job_handler['backward_task_files']
dlog.debug('checking jobs')
nchunks = len(task_chunks)
for idx in range(nchunks) :
cur_hash = task_hashes[idx]
rjob = job_list[idx]
if not job_record.check_finished(cur_hash) :
# chunk not finished according to record
status = rjob['batch'].check_status()
job_uuid = rjob['context'].job_uuid
dlog.debug('checked job %s' % job_uuid)
if status == JobStatus.terminated :
job_record.increase_nfail(cur_hash)
if job_record.check_nfail(cur_hash) > 3:
raise RuntimeError('Job %s failed for more than 3 times' % job_uuid)
dlog.info('job %s terminated, submit again'% job_uuid)
dlog.debug('try %s times for %s'% (job_record.check_nfail(cur_hash), job_uuid))
rjob['batch'].submit(task_chunks[idx], command, res = resources, outlog=outlog, errlog=errlog,restart=True)
def _submit():
script_name = self._make_script(job_dirs, cmd, args, res = resources)
stdin, stdout, stderr = self.block_checkcall(('cd %s; sbatch %s' % (self.remote_root, script_name)))
subret = (stdout.readlines())
job_id = subret[0].split()[-1]
sftp = self.ssh.open_sftp()
with sftp.open(os.path.join(self.remote_root, 'job_id'), 'w') as fp:
fp.write(job_id)
sftp.close()
dlog.debug(restart)
if restart:
try:
status = self.check_status()
dlog.debug(status)
if status in [ JobStatus.unsubmitted, JobStatus.unknown, JobStatus.terminated ]:
dlog.debug('task restart point !!!')
_submit()
elif status==JobStatus.waiting:
dlog.debug('task is waiting')
elif status==JobStatus.running:
dlog.debug('task is running')
else:
dlog.debug('task is finished')
except:
dlog.debug('no job_id file')
dlog.debug('task restart point !!!')
_submit()
else:
dlog.debug('new task!!!')
else :
job_uuid = job_record.get_uuid(cur_hash)
dlog.debug("load uuid %s for chunk %s" % (job_uuid, cur_hash))
# communication context, bach system
context = self.context(work_path, self.session, job_uuid)
batch = self.batch(context, uuid_names = self.uuid_names)
rjob = {'context':context, 'batch':batch}
# upload files
if not rjob['context'].check_file_exists('tag_upload'):
rjob['context'].upload('.',
forward_common_files)
rjob['context'].upload(cur_chunk,
forward_task_files,
dereference = forward_task_deference)
rjob['context'].write_file('tag_upload', '')
dlog.debug('uploaded files for %s' % task_chunks_str[ii])
# submit new or recover old submission
if not submitted:
rjob['batch'].submit(cur_chunk, command, res = resources, outlog=outlog, errlog=errlog)
job_uuid = rjob['context'].job_uuid
dlog.debug('assigned uuid %s for %s ' % (job_uuid, task_chunks_str[ii]))
dlog.info('new submission of %s for chunk %s' % (job_uuid, cur_hash))
else:
rjob['batch'].submit(cur_chunk, command, res = resources, outlog=outlog, errlog=errlog, restart = True)
dlog.info('restart from old submission %s for chunk %s' % (job_uuid, cur_hash))
# record job and its remote context
job_list.append(rjob)
ip = None
instance_id = None
if "type" in self.remote_profile:
if self.remote_profile['type'] == 'ALI':
ip = self.remote_profile['hostname']
lcount[idx]+=1
_job_uuid=rjob.remote_root.split('/')[-1]
dlog.info('Job at %s terminated, submit again'% _job_uuid)
dlog.debug('try %s times for %s'% (lcount[idx], _job_uuid))
rjob.submit(task_chunks[idx], command, resources = resources,restart=True)
if lcount[idx]>3:
dlog.info('Too many errors for ! %s ' % _job_uuid)
rjob.download(task_chunks[idx], backward_task_files,back_error=True)
rjob.clean()
job_fin[idx] = True
elif status == JobStatus.finished :
rjob.download(task_chunks[idx], backward_task_files)
rjob.clean()
job_fin[idx] = True
time.sleep(10)
dlog.debug('error count')
dlog.debug(lcount)
# delete path map file when job finish
_pmap.delete()
local_root,
work_profile,
job_uuid = None) :
"""
work_profile:
local_root:
"""
assert(type(local_root) == str)
self.local_root = os.path.abspath(local_root)
if job_uuid:
self.job_uuid=job_uuid
else:
self.job_uuid = str(uuid.uuid4())
self.remote_root = os.path.join(work_profile.get_work_root(), self.job_uuid)
dlog.debug("local_root is %s"% local_root)
dlog.debug("remote_root is %s"% self.remote_root)
os.makedirs(self.remote_root, exist_ok = True)
def parsing_vasp(path,config_info_dict, skip_init, output=OUTPUT,id_prefix=None):
fp_iters=os.path.join(path,ITERS_PAT)
dlog.debug(fp_iters)
f_fp_iters=glob(fp_iters)
dlog.info("len iterations data: %s"%len(f_fp_iters))
fp_init=os.path.join(path,INIT_PAT)
dlog.debug(fp_init)
f_fp_init=glob(fp_init)
if skip_init:
entries = _parsing_vasp(f_fp_iters,config_info_dict, id_prefix)
dlog.info("len collected data: %s"%len(entries))
else:
dlog.info("len initialization data: %s"%len(f_fp_init))
entries=_parsing_vasp(f_fp_init,config_info_dict, id_prefix,iters=False)
entries.extend(_parsing_vasp(f_fp_iters,config_info_dict, id_prefix))
dlog.info("len collected data: %s"%len(entries))
#print(output)
#print(entries)
dumpfn(entries,output,indent=4)
{'40fb24b2-d0ca-4443-8e3a-c0906ea03622': ,
'41bda50c-0a23-4372-806c-87d16a680d85': }
"""
query_dict ={}
if datetime.now().timestamp() > cls._query_next_allow_time:
cls._query_next_allow_time=datetime.now().timestamp()+cls._query_time_interval
for status in ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING','SUCCEEDED', 'FAILED']:
status_response = cls.batch_client.list_jobs(jobQueue=cls._jobQueue, jobStatus=status, maxResults=cls._query_max_results)
status_list=status_response.get('jobSummaryList', [])
for job_dict in status_list:
query_dict.update({job_dict['jobId']: cls.map_aws_status_to_dpgen_status(job_dict['status'])})
for job in cls._job_id_map_status:
cls._job_id_map_status[job]=query_dict.get(job, JobStatus.unknown)
dlog.debug('20000: _query: %s, _map: %s' %(query_dict, cls._job_id_map_status))
dlog.debug('62000:job_id:%s, _query: %s, _map: %s' %(job_id, query_dict, cls._job_id_map_status))
if job_id:
return cls._job_id_map_status.get(job_id, query_dict.get(job_id,JobStatus.unknown))
return cls._job_id_map_status
def parsing_vasp(path,config_info_dict, skip_init, output=OUTPUT,id_prefix=None):
fp_iters=os.path.join(path,ITERS_PAT)
dlog.debug(fp_iters)
f_fp_iters=glob(fp_iters)
dlog.info("len iterations data: %s"%len(f_fp_iters))
fp_init=os.path.join(path,INIT_PAT)
dlog.debug(fp_init)
f_fp_init=glob(fp_init)
if skip_init:
entries = _parsing_vasp(f_fp_iters,config_info_dict, id_prefix)
dlog.info("len collected data: %s"%len(entries))
else:
dlog.info("len initialization data: %s"%len(f_fp_init))
entries=_parsing_vasp(f_fp_init,config_info_dict, id_prefix,iters=False)
entries.extend(_parsing_vasp(f_fp_iters,config_info_dict, id_prefix))
dlog.info("len collected data: %s"%len(entries))
#print(output)
#print(entries)
dumpfn(entries,output,indent=4)
def sub_script(self, job_dirs, cmd, args, res, outlog, errlog):
"""
return a command_Str like(indeed withoud tailing \n):
((cd /home/ec2-user/Ag_init/run_gen/iter.000000/00.train/001 && /usr/bin/dp_train input.json 2>>train.log |tee -a train.log)&& touch tag_0_finished);wait;
((cd /home/ec2-user/Ag_init/run_gen/iter.000000/00.train/001 && /usr/bin/dp_frz 2>>train.log |tee -a train.log)&& touch tag_1_finished);wait;
((cd /home/ec2-user/Ag_init/run_gen/iter.000000/00.train/003 && /usr/bin/dp_train input.json 2>>train.log |tee -a train.log)&& touch tag_0_finished);wait;
((cd /home/ec2-user/Ag_init/run_gen/iter.000000/00.train/003 && /usr/bin/dp_frz 2>>train.log |tee -a train.log)&& touch tag_1_finished);wait;
"""
if args is None:
args=[]
multi_command = ""
for job_dir in job_dirs:
for idx,t in enumerate(zip_longest(cmd, args, fillvalue='')):
c_str = f"((cd {self.context.remote_root}/{job_dir} && test -f touch tag_{idx}_finished || {t[0]} {t[1]} 2>>{errlog} && touch tag_{idx}_finished ) | tee -a {outlog});wait;"
multi_command += c_str
dlog.debug("10000, %s" % multi_command)
return multi_command