Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ttyFlag = '-t' if tty else ''
commandTokens += ['docker', 'exec', '-i', ttyFlag, 'toil_leader']
inputString = kwargs.pop('input', None)
if inputString is not None:
kwargs['stdin'] = subprocess.PIPE
collectStdout = kwargs.pop('collectStdout', None)
if collectStdout:
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
logger.debug('Node %s: %s', self.effectiveIP, ' '.join(args))
args = list(map(pipes.quote, args))
commandTokens += args
logger.debug('Full command %s', ' '.join(commandTokens))
popen = subprocess.Popen(commandTokens, **kwargs)
stdout, stderr = popen.communicate(input=inputString)
# at this point the process has already exited, no need for a timeout
resultValue = popen.wait()
# ssh has been throwing random 255 errors - why?
if resultValue != 0:
logger.debug('SSH Error (%s) %s' % (resultValue, stderr))
raise RuntimeError('Executing the command "%s" on the appliance returned a non-zero '
'exit code %s with stdout %s and stderr %s'
% (' '.join(args), resultValue, stdout, stderr))
return stdout
def _getJobDetailsFromScontrol(self, slurmJobID):
args = ['scontrol',
'show',
'job',
str(slurmJobID)]
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
job = dict()
for line in process.stdout:
values = line.decode('utf-8').strip().split()
# If job information is not available an error is issued:
# slurm_load_jobs error: Invalid job id specified
# There is no job information, so exit.
if len(values)>0 and values[0] == 'slurm_load_jobs':
return (None, None)
# Output is in the form of many key=value pairs, multiple pairs on each line
# and multiple lines in the output. Each pair is pulled out of each line and
# added to a dictionary
for v in values:
bits = v.split('=')
def getJobExitCode(self, lsfJobID):
# the task is set as part of the job ID if using getBatchSystemID()
job, task = (lsfJobID, None)
if '.' in lsfJobID:
job, task = lsfJobID.split('.', 1)
# first try bjobs to find out job state
args = ["bjobs", "-l", str(job)]
logger.debug("Checking job exit code for job via bjobs: "
"{}".format(job))
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.stdout.read().decode('utf-8').replace("\n ", "")
process_output = output.split('\n')
started = 0
for line in process_output:
if "Done successfully" in line or "Status " in line:
logger.debug("bjobs detected job completed for job: "
"{}".format(job))
return 0
elif "New job is waiting for scheduling" in line:
logger.debug("bjobs detected job pending scheduling for "
"job: {}".format(job))
return None
elif "PENDING REASONS" in line or "Status " in line:
logger.debug("bjobs detected job pending for job: "
"{}".format(job))
def submitJob(self, subLine):
combinedEnv = self.boss.environment
combinedEnv.update(os.environ)
process = subprocess.Popen(subLine, stdout=subprocess.PIPE,
env=combinedEnv)
line = process.stdout.readline().decode('utf-8')
logger.debug("BSUB: " + line)
result = int(line.strip().split()[1].strip('<>'))
logger.debug("Got the job id: {}".format(result))
return result
def _getJobDetailsFromSacct(self, slurmJobID):
# SLURM job exit codes are obtained by running sacct.
args = ['sacct',
'-n', # no header
'-j', str(slurmJobID), # job
'--format', 'State,ExitCode', # specify output columns
'-P', # separate columns with pipes
'-S', '1970-01-01'] # override start time limit
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
rc = process.returncode
if rc != 0:
# no accounting system or some other error
return (None, -999)
for line in process.stdout:
values = line.decode('utf-8').strip().split('|')
if len(values) < 2:
continue
state, exitcode = values
logger.debug("sacct job state is %s", state)
# If Job is in a running state, return None to indicate we don't have an update
status, signal = [int(n) for n in exitcode.split(':')]
if signal > 0:
# A non-zero signal may indicate e.g. an out-of-memory killed job
def getJobExitCode(self, torqueJobID):
if self._version == "pro":
args = ["qstat", "-x", "-f", str(torqueJobID).split('.')[0]]
elif self._version == "oss":
args = ["qstat", "-f", str(torqueJobID).split('.')[0]]
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in process.stdout:
line = line.strip()
#logger.debug("getJobExitCode exit status: " + line)
# Case differences due to PBSPro vs OSS Torque qstat outputs
if line.startswith("failed") or line.startswith("FAILED") and int(line.split()[1]) == 1:
return 1
if line.startswith("exit_status") or line.startswith("Exit_status"):
status = line.split(' = ')[1]
logger.debug('Exit Status: ' + status)
return int(status)
if 'unknown job id' in line.lower():
# some clusters configure Torque to forget everything about just
# finished jobs instantly, apparently for performance reasons
logger.debug('Batch system no longer remembers about job {}'.format(torqueJobID))
# return assumed success; status files should reveal failure
return 0
Add an "privateIP hostname" line to the /etc/hosts file. If destinationIP is given,
do this on the remote machine.
Azure VMs sometimes fail to initialize, causing the appliance to fail.
This error is given:
Failed to obtain the IP address for 'l7d41a19b-15a6-442c-8ba1-9678a951d824';
the DNS service may not be able to resolve it: Name or service not known.
This method is a fix.
:param node: Node to add to /etc/hosts.
:param destinationIP: A remote host's address
"""
cmd = "echo %s %s | sudo tee --append /etc/hosts > /dev/null" % (node.privateIP, node.name)
logger.debug("Running command %s on %s" % (cmd, destinationIP))
if destinationIP:
subprocess.Popen(["ssh", "-oStrictHostKeyChecking=no", "core@%s" % destinationIP, cmd])
else:
subprocess.Popen(cmd, shell=True)