Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.jobsDomain.delete_attributes(item_name=bytes(jobStoreID))
items = None
for attempt in retry_sdb():
with attempt:
items = list(self.filesDomain.select(
consistent_read=True,
query="select version from `%s` where ownerID='%s'" % (
self.filesDomain.name, jobStoreID)))
assert items is not None
if items:
log.debug("Deleting %d file(s) associated with job %s", len(items), jobStoreID)
n = self.itemsPerBatchDelete
batches = [items[i:i + n] for i in range(0, len(items), n)]
for batch in batches:
itemsDict = {item.name: None for item in batch}
for attempt in retry_sdb():
with attempt:
self.filesDomain.batch_delete_attributes(itemsDict)
for item in items:
version = item.get('version')
for attempt in retry_s3():
with attempt:
if version:
self.filesBucket.delete_key(key_name=bytes(item.name), version_id=version)
else:
self.filesBucket.delete_key(key_name=bytes(item.name))
def jobs(self):
result = None
for attempt in retry_sdb():
with attempt:
result = list(self.jobsDomain.select(
consistent_read=True,
query="select * from `%s`" % self.jobsDomain.name))
assert result is not None
for jobItem in result:
yield self._awsJobFromItem(jobItem)
def _delete_domain(self, domain):
for attempt in retry_sdb():
with attempt:
try:
domain.delete()
except SDBResponseError as e:
if no_such_sdb_domain(e):
pass
else:
raise
def exists(self, jobStoreID):
for attempt in retry_sdb():
with attempt:
return bool(self.jobsDomain.get_attributes(
item_name=bytes(jobStoreID),
attribute_name=[SDBHelper.presenceIndicator()],
consistent_read=True))
def load(self, jobStoreID):
item = None
for attempt in retry_sdb():
with attempt:
item = self.jobsDomain.get_attributes(compat_bytes(jobStoreID), consistent_read=True)
if not item:
raise NoSuchJobException(jobStoreID)
job = self._awsJobFromItem(item)
if job is None:
raise NoSuchJobException(jobStoreID)
log.debug("Loaded job %s", jobStoreID)
return job
"""
# The weird mapping of the SDB item attribute value to the property value is due to
# backwards compatibility. 'True' becomes True, that's easy. Toil < 3.3.0 writes this at
# the end of job store creation. Absence of either the registry, the item or the
# attribute becomes False, representing a truly absent, non-existing job store. An
# attribute value of 'False', which is what Toil < 3.3.0 writes at the *beginning* of job
# store destruction, indicates a job store in transition, reflecting the fact that 3.3.0
# may leak buckets or domains even though the registry reports 'False' for them. We
# can't handle job stores that were partially created by 3.3.0, though.
registry_domain = self._bindDomain(domain_name='toil-registry',
create=False,
block=False)
if registry_domain is None:
return False
else:
for attempt in retry_sdb():
with attempt:
attributes = registry_domain.get_attributes(item_name=self.namePrefix,
attribute_name='exists',
consistent_read=True)
try:
exists = attributes['exists']
except KeyError:
return False
else:
if exists == 'True':
return True
elif exists == 'False':
return None
else:
assert False
def _readStatsAndLogging(self, callback, ownerId):
items = None
for attempt in retry_sdb():
with attempt:
items = list(self.filesDomain.select(
consistent_read=True,
query="select * from `%s` where ownerID='%s'" % (
self.filesDomain.name, str(ownerId))))
assert items is not None
for item in items:
info = self.FileInfo.fromItem(item)
with info.downloadStream() as readable:
callback(readable)
yield info
def create(self, jobNode):
jobStoreID = self._newJobID()
log.debug("Creating job %s for '%s'",
jobStoreID, '' if jobNode.command is None else jobNode.command)
job = JobGraph.fromJobNode(jobNode, jobStoreID=jobStoreID, tryCount=self._defaultTryCount())
if hasattr(self, "_batchedJobGraphs") and self._batchedJobGraphs is not None:
self._batchedJobGraphs.append(job)
else:
item = self._awsJobToItem(job)
for attempt in retry_sdb():
with attempt:
assert self.jobsDomain.put_attributes(job.jobStoreID, item)
return job
def load(self, jobStoreID):
item = None
for attempt in retry_sdb():
with attempt:
item = self.jobsDomain.get_attributes(bytes(jobStoreID), consistent_read=True)
if not item:
raise NoSuchJobException(jobStoreID)
job = self._awsJobFromItem(item)
if job is None:
raise NoSuchJobException(jobStoreID)
log.debug("Loaded job %s", jobStoreID)
return job