Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.jobStateFile = self._createJobStateFile()
freeSpace, diskSize = getFileSystemSize(self.localTempDir)
if freeSpace <= 0.1 * diskSize:
logger.warning('Starting job %s with less than 10%% of disk space remaining.',
self.jobName)
try:
os.chdir(self.localTempDir)
yield
finally:
diskUsed = getDirSizeRecursively(self.localTempDir)
logString = ("Job {jobName} used {percent:.2f}% ({humanDisk}B [{disk}B] used, "
"{humanRequestedDisk}B [{requestedDisk}B] requested) at the end of "
"its run.".format(jobName=self.jobName,
percent=(float(diskUsed) / jobReqs * 100 if
jobReqs > 0 else 0.0),
humanDisk=bytes2human(diskUsed),
disk=diskUsed,
humanRequestedDisk=bytes2human(jobReqs),
requestedDisk=jobReqs))
self.logToMaster(logString, level=logging.DEBUG)
if diskUsed > jobReqs:
self.logToMaster("Job used more disk than requested. Consider modifying the user "
"script to avoid the chance of failure due to incorrectly "
"requested resources. " + logString, level=logging.WARNING)
os.chdir(startingDir)
# Finally delete the job from the worker
os.remove(self.jobStateFile)
"memory/cores requested from the batch system.")
addOptionFn('--defaultMemory', dest='defaultMemory', default=None, metavar='INT',
help='The default amount of memory to request for a job. Only applicable to jobs '
'that do not specify an explicit value for this requirement. Standard '
'suffixes like K, Ki, M, Mi, G or Gi are supported. Default is %s' %
bytes2human(config.defaultMemory, symbols='iec'))
addOptionFn('--defaultCores', dest='defaultCores', default=None, metavar='FLOAT',
help='The default number of CPU cores to dedicate a job. Only applicable to jobs '
'that do not specify an explicit value for this requirement. Fractions of a '
'core (for example 0.1) are supported on some batch systems, namely Mesos '
'and singleMachine. Default is %.1f ' % config.defaultCores)
addOptionFn('--defaultDisk', dest='defaultDisk', default=None, metavar='INT',
help='The default amount of disk space to dedicate a job. Only applicable to jobs '
'that do not specify an explicit value for this requirement. Standard '
'suffixes like K, Ki, M, Mi, G or Gi are supported. Default is %s' %
bytes2human(config.defaultDisk, symbols='iec'))
assert not config.defaultPreemptable, 'User would be unable to reset config.defaultPreemptable'
addOptionFn('--defaultPreemptable', dest='defaultPreemptable', action='store_true')
addOptionFn('--maxCores', dest='maxCores', default=None, metavar='INT',
help='The maximum number of CPU cores to request from the batch system at any one '
'time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default '
'is %s' % bytes2human(config.maxCores, symbols='iec'))
addOptionFn('--maxMemory', dest='maxMemory', default=None, metavar='INT',
help="The maximum amount of memory to request from the batch system at any one "
"time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default "
"is %s" % bytes2human(config.maxMemory, symbols='iec'))
addOptionFn('--maxDisk', dest='maxDisk', default=None, metavar='INT',
help='The maximum amount of disk space to request from the batch system at any '
'one time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. '
'Default is %s' % bytes2human(config.maxDisk, symbols='iec'))
#
'suffixes like K, Ki, M, Mi, G or Gi are supported. Default is %s' %
bytes2human(config.defaultDisk, symbols='iec'))
assert not config.defaultPreemptable, 'User would be unable to reset config.defaultPreemptable'
addOptionFn('--defaultPreemptable', dest='defaultPreemptable', action='store_true')
addOptionFn('--maxCores', dest='maxCores', default=None, metavar='INT',
help='The maximum number of CPU cores to request from the batch system at any one '
'time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default '
'is %s' % bytes2human(config.maxCores, symbols='iec'))
addOptionFn('--maxMemory', dest='maxMemory', default=None, metavar='INT',
help="The maximum amount of memory to request from the batch system at any one "
"time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default "
"is %s" % bytes2human(config.maxMemory, symbols='iec'))
addOptionFn('--maxDisk', dest='maxDisk', default=None, metavar='INT',
help='The maximum amount of disk space to request from the batch system at any '
'one time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. '
'Default is %s' % bytes2human(config.maxDisk, symbols='iec'))
#
# Retrying/rescuing jobs
#
addOptionFn = addGroupFn("toil options for rescuing/killing/restarting jobs", \
"The options for jobs that either run too long/fail or get lost \
(some batch systems have issues!)")
addOptionFn("--retryCount", dest="retryCount", default=None,
help=("Number of times to retry a failing job before giving up and "
"labeling job failed. default=%s" % config.retryCount))
addOptionFn("--maxJobDuration", dest="maxJobDuration", default=None,
help=("Maximum runtime of a job (in seconds) before we kill it "
"(this is a lower bound, and the actual time before killing "
"the job may be longer). default=%s" % config.maxJobDuration))
addOptionFn("--rescueJobsFrequency", dest="rescueJobsFrequency", default=None,
help=("Period of time to wait (in seconds) between checking for "
jobNode.command = ' '.join((resolveEntryPoint('_toil_worker'),
jobNode.jobName,
self.jobStoreLocator,
jobNode.jobStoreID))
# jobBatchSystemID is an int that is an incremented counter for each job
jobBatchSystemID = self.batchSystem.issueBatchJob(jobNode)
self.jobBatchSystemIDToIssuedJob[jobBatchSystemID] = jobNode
if jobNode.preemptable:
# len(jobBatchSystemIDToIssuedJob) should always be greater than or equal to preemptableJobsIssued,
# so increment this value after the job is added to the issuedJob dict
self.preemptableJobsIssued += 1
cur_logger = logger.debug if jobNode.jobName.startswith(CWL_INTERNAL_JOBS) else logger.info
cur_logger("Issued job %s with job batch system ID: "
"%s and cores: %s, disk: %s, and memory: %s",
jobNode, str(jobBatchSystemID), int(jobNode.cores),
bytes2human(jobNode.disk), bytes2human(jobNode.memory))
if self.toilMetrics:
self.toilMetrics.logIssuedJob(jobNode)
self.toilMetrics.logQueueSize(self.getNumberOfJobsIssued())
'and singleMachine. Default is %.1f ' % config.defaultCores)
addOptionFn('--defaultDisk', dest='defaultDisk', default=None, metavar='INT',
help='The default amount of disk space to dedicate a job. Only applicable to jobs '
'that do not specify an explicit value for this requirement. Standard '
'suffixes like K, Ki, M, Mi, G or Gi are supported. Default is %s' %
bytes2human(config.defaultDisk, symbols='iec'))
assert not config.defaultPreemptable, 'User would be unable to reset config.defaultPreemptable'
addOptionFn('--defaultPreemptable', dest='defaultPreemptable', action='store_true')
addOptionFn('--maxCores', dest='maxCores', default=None, metavar='INT',
help='The maximum number of CPU cores to request from the batch system at any one '
'time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default '
'is %s' % bytes2human(config.maxCores, symbols='iec'))
addOptionFn('--maxMemory', dest='maxMemory', default=None, metavar='INT',
help="The maximum amount of memory to request from the batch system at any one "
"time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default "
"is %s" % bytes2human(config.maxMemory, symbols='iec'))
addOptionFn('--maxDisk', dest='maxDisk', default=None, metavar='INT',
help='The maximum amount of disk space to request from the batch system at any '
'one time. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. '
'Default is %s' % bytes2human(config.maxDisk, symbols='iec'))
#
# Retrying/rescuing jobs
#
addOptionFn = addGroupFn("toil options for rescuing/killing/restarting jobs", \
"The options for jobs that either run too long/fail or get lost \
(some batch systems have issues!)")
addOptionFn("--retryCount", dest="retryCount", default=None,
help=("Number of times to retry a failing job before giving up and "
"labeling job failed. default=%s" % config.retryCount))
addOptionFn("--maxJobDuration", dest="maxJobDuration", default=None,
help=("Maximum runtime of a job (in seconds) before we kill it "
try:
os.chdir(self.localTempDir)
yield
finally:
# See how much disk space is used at the end of the job.
# Not a real peak disk usage, but close enough to be useful for warning the user.
# TODO: Push this logic into the abstract file store
diskUsed = getDirSizeRecursively(self.localTempDir)
logString = ("Job {jobName} used {percent:.2f}% ({humanDisk}B [{disk}B] used, "
"{humanRequestedDisk}B [{requestedDisk}B] requested) at the end of "
"its run.".format(jobName=self.jobName,
percent=(float(diskUsed) / self.jobDiskBytes * 100 if
self.jobDiskBytes > 0 else 0.0),
humanDisk=bytes2human(diskUsed),
disk=diskUsed,
humanRequestedDisk=bytes2human(self.jobDiskBytes),
requestedDisk=self.jobDiskBytes))
self.logToMaster(logString, level=logging.DEBUG)
if diskUsed > self.jobDiskBytes:
self.logToMaster("Job used more disk than requested. Please reconsider modifying "
"the user script to avoid the chance of failure due to "
"incorrectly requested resources. " + logString,
level=logging.WARNING)
# Go back up to the per-worker local temp directory.
os.chdir(startingDir)
self.cleanupInProgress = True
# Record that our job is no longer using its space, and clean up
# its temp dir and database entry.
self._deallocateSpaceForJob()
Add a job to the queue of jobs
"""
jobNode.command = ' '.join((resolveEntryPoint('_toil_worker'),
jobNode.jobName, self.jobStoreLocator, jobNode.jobStoreID))
jobBatchSystemID = self.batchSystem.issueBatchJob(jobNode)
self.jobBatchSystemIDToIssuedJob[jobBatchSystemID] = jobNode
if jobNode.preemptable:
# len(jobBatchSystemIDToIssuedJob) should always be greater than or equal to preemptableJobsIssued,
# so increment this value after the job is added to the issuedJob dict
self.preemptableJobsIssued += 1
cur_logger = (logger.debug if jobNode.jobName.startswith(CWL_INTERNAL_JOBS)
else logger.info)
cur_logger("Issued job %s with job batch system ID: "
"%s and cores: %s, disk: %s, and memory: %s",
jobNode, str(jobBatchSystemID), int(jobNode.cores),
bytes2human(jobNode.disk), bytes2human(jobNode.memory))
if self.toilMetrics:
self.toilMetrics.logIssuedJob(jobNode)
self.toilMetrics.logQueueSize(self.getNumberOfJobsIssued())
if freeSpace <= 0.1 * diskSize:
logger.warning('Starting job %s with less than 10%% of disk space remaining.',
self.jobName)
try:
os.chdir(self.localTempDir)
yield
finally:
diskUsed = getDirSizeRecursively(self.localTempDir)
logString = ("Job {jobName} used {percent:.2f}% ({humanDisk}B [{disk}B] used, "
"{humanRequestedDisk}B [{requestedDisk}B] requested) at the end of "
"its run.".format(jobName=self.jobName,
percent=(float(diskUsed) / jobReqs * 100 if
jobReqs > 0 else 0.0),
humanDisk=bytes2human(diskUsed),
disk=diskUsed,
humanRequestedDisk=bytes2human(jobReqs),
requestedDisk=jobReqs))
self.logToMaster(logString, level=logging.DEBUG)
if diskUsed > jobReqs:
self.logToMaster("Job used more disk than requested. Consider modifying the user "
"script to avoid the chance of failure due to incorrectly "
"requested resources. " + logString, level=logging.WARNING)
os.chdir(startingDir)
# Finally delete the job from the worker
os.remove(self.jobStateFile)