Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
job_ids = [jid for jid in os.listdir(root) if m.match(jid)]
except OSError as error:
if error.errno == errno.ENOENT:
return
else:
raise
for i, job_id in enumerate(job_ids):
if not m.match(job_id):
continue
doc = {'signac_id': job_id, KEY_PATH: root}
if signac_id_alias:
doc[signac_id_alias] = job_id
fn_sp = os.path.join(root, job_id, fn_statepoint)
with open(fn_sp, 'rb') as file:
sp = json.loads(file.read().decode(encoding))
if statepoint_dict is not None:
statepoint_dict[job_id] = sp
if statepoint_index:
doc[statepoint_index] = sp
else:
doc.update(sp)
if include_job_document:
fn_doc = os.path.join(root, job_id, fn_job_document)
try:
with open(fn_doc, 'rb') as file:
doc.update(json.loads(file.read().decode(encoding)))
except IOError as error:
if error.errno != errno.ENOENT:
raise
yield doc
if job_ids:
def _build_index(self, include_job_document=False):
"""
Generate a basic state point index.
"""
wd = self.workspace() if self.Job is Job else None
for _id in self._find_job_ids():
doc = dict(_id=_id, statepoint=self._get_statepoint(_id))
if include_job_document:
if wd is None:
doc.update(self.open_job(id=_id).document)
else: # use optimized path
try:
with open(os.path.join(wd, _id, self.Job.FN_DOCUMENT), 'rb') as file:
doc.update(json.loads(file.read().decode()))
except IOError as error:
if error.errno != errno.ENOENT:
raise
yield doc
def read_statepoints(self, fn=None):
"""Read all statepoints from a file.
:param fn: The filename of the file containing the statepoints,
defaults to :const:`~signac.contrib.project.Project.FN_STATEPOINTS`.
:type fn: str
See also :meth:`dump_statepoints` and :meth:`write_statepoints`.
"""
if fn is None:
fn = self.fn(self.FN_STATEPOINTS)
# See comment in write statepoints.
with open(fn, 'r') as file:
return json.loads(file.read())
def _read_cache(self):
"Read the persistent state point cache (if available)."
logger.debug("Reading cache...")
start = time.time()
try:
with gzip.open(self.fn(self.FN_CACHE), 'rb') as cachefile:
cache = json.loads(cachefile.read().decode())
self._sp_cache.update(cache)
except IOError as error:
if not error.errno == errno.ENOENT:
raise
logger.debug("No cache file found.")
else:
delta = time.time() - start
logger.debug("Read cache in {:.3f} seconds.".format(delta))
return cache