Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_id(self, index=0):
"Return a name, which identifies this job-operation."
project = self.job._project
# The full name is designed to be truly unique for each job-operation.
full_name = '{}%{}%{}%{}'.format(
project.root_directory(), self.job.get_id(), self.name, index)
# The job_op_id is a hash computed from the unique full name.
job_op_id = calc_id(full_name)
# The actual job id is then constructed from a readable part and the job_op_id,
# ensuring that the job-op is still somewhat identifiable, but guaranteed to
# be unique. The readable name is based on the project id, job id, operation name,
# and the index number. All names and the id itself are restricted in length
# to guarantee that the id does not get too long.
max_len = self.MAX_LEN_ID - len(job_op_id)
if max_len < len(job_op_id):
raise ValueError("Value for MAX_LEN_ID is too small ({}).".format(self.MAX_LEN_ID))
readable_name = '{}/{}/{}/{:04d}/'.format(
str(project)[:12], str(self.job)[:8], self.name[:12], index)[:max_len]
# By appending the unique job_op_id, we ensure that each id is truly unique.
return readable_name + job_op_id
def get_id(self, index=0):
"Return a name, which identifies this job-operation."
project = self.job._project
# The full name is designed to be truly unique for each job-operation.
full_name = '{}%{}%{}%{}'.format(
project.root_directory(), self.job.get_id(), self.name, index)
# The job_op_id is a hash computed from the unique full name.
job_op_id = calc_id(full_name)
# The actual job id is then constructed from a readable part and the job_op_id,
# ensuring that the job-op is still somewhat identifiable, but guarantueed to
# be unique. The readable name is based on the project id, job id, operation name,
# and the index number. All names and the id itself are restricted in length
# to guarantuee that the id does not get too long.
max_len = self.MAX_LEN_ID - len(job_op_id)
if max_len < len(job_op_id):
raise ValueError("Value for MAX_LEN_ID is too small ({}).".format(self.MAX_LEN_ID))
readable_name = '{}/{}/{}/{:04d}/'.format(
str(project)[:12], str(self.job)[:8], self.name[:12], index)[:max_len]
# By appending the unique job_op_id, we ensure that each id is truly unique.
return readable_name + job_op_id
def _check_manifest(self):
"Check whether the manifest file, if it exists, is correct."
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
try:
with open(fn_manifest, 'rb') as file:
assert calc_id(json.loads(file.read().decode())) == self._id
except IOError as error:
if error.errno != errno.ENOENT:
raise error
except Exception as error:
logger.error(
"State point manifest file of job '{}' appears to be corrupted.".format(self._id))
raise JobsCorruptedError([self._id])
See also :meth:`dump_statepoints`.
"""
if fn is None:
fn = self.fn(self.FN_STATEPOINTS)
try:
tmp = self.read_statepoints(fn=fn)
except IOError as error:
if not error.errno == errno.ENOENT:
raise
tmp = dict()
if statepoints is None:
job_ids = self._job_dirs()
_cache = {_id: self._get_statepoint(_id) for _id in job_ids}
else:
_cache = {calc_id(sp): sp for sp in statepoints}
tmp.update(_cache)
logger.debug("Writing state points file with {} entries.".format(len(tmp)))
with open(fn, 'w') as file:
file.write(json.dumps(tmp, indent=indent))
def get_id(self, index=0):
"Return a name, which identifies this job-operation."
# The full name is designed to be truly unique for each job-operation.
full_name = '{}%{}%{}'.format(
self.project.root_directory(), self.name, index)
# The job_op_id is a hash computed from the unique full name.
op_id = calc_id(full_name)
# The actual job id is then constructed from a readable part and the job_op_id,
# ensuring that the job-op is still somewhat identifiable, but guarantueed to
# be unique. The readable name is based on the project id, job id, operation name,
# and the index number. All names and the id itself are restricted in length
# to guarantuee that the id does not get too long.
max_len = self.MAX_LEN_ID - len(op_id)
if max_len < len(op_id):
raise ValueError("Value for MAX_LEN_ID is too small ({}).".format(self.MAX_LEN_ID))
readable_name = '{}/{}/{:04d}/'.format(
str(self.project)[:12], self.name[:12], index)[:max_len]
# By appending the unique op_id, we ensure that each id is truly unique.
return readable_name + op_id
def _get_statepoint(self, job_id):
sp = self._statepoints.setdefault(job_id, self._read_statepoint(job_id))
assert calc_id(sp) == job_id
return sp
try:
self._sp_cache.update(self.read_statepoints(fn=fn_statepoints))
except IOError as error:
if error.errno != errno.ENOENT or fn_statepoints is not None:
raise
if index is not None:
for doc in index:
self._sp_cache[doc['signac_id']] = doc['statepoint']
corrupted = []
for job_id in job_ids:
try:
# First, check if we can look up the state point.
sp = self._get_statepoint(job_id)
# Check if state point and id correspond.
correct_id = calc_id(sp)
if correct_id != job_id:
logger.warning(
"The job id of job '{}' is incorrect; "
"it should be '{}'.".format(job_id, correct_id))
invalid_wd = os.path.join(self.workspace(), job_id)
correct_wd = os.path.join(self.workspace(), correct_id)
try:
os.replace(invalid_wd, correct_wd)
except OSError as error:
logger.critical(
"Unable to fix location of job with "
" id '{}': '{}'.".format(job_id, error))
corrupted.append(job_id)
continue
else:
logger.info("Moved job to correct workspace.")