Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_events_from_task_dir(self, task_dir):
try:
with open(os.path.join(task_dir, 'events.txt'), 'r') as f:
events = []
for line in f:
if line.rstrip():
name, time_string = line.split(',')
start_time = dsub_util.replace_timezone(
datetime.datetime.strptime(time_string.rstrip(),
'%Y-%m-%d %H:%M:%S.%f'), tzlocal())
events.append({'name': name, 'start-time': start_time})
return events
except (IOError, OSError):
return None
def _get_last_update_time_from_task_dir(self, task_dir):
last_update = 0
for filename in ['status.txt', 'runner-log.txt', 'meta.yaml']:
try:
mtime = os.path.getmtime(os.path.join(task_dir, filename))
last_update = max(last_update, mtime)
except (IOError, OSError):
pass
return dsub_util.replace_timezone(
datetime.datetime.fromtimestamp(last_update),
tzlocal()) if last_update > 0 else None
# the string is coming from the local provider, reading an old version of
# its meta.yaml.
dsub_version = job.get('dsub-version')
if not dsub_version:
return cls._from_yaml_v0(job)
job_metadata = {}
for key in [
'job-id', 'job-name', 'task-ids', 'user-id', 'dsub-version',
'user-project', 'script-name'
]:
if job.get(key) is not None:
job_metadata[key] = job.get(key)
# Make sure that create-time string is turned into a datetime
job_metadata['create-time'] = dsub_util.replace_timezone(
job.get('create-time'), pytz.utc)
job_resources = Resources(logging=job.get('logging'))
job_params = {}
job_params['labels'] = cls._label_params_from_dict(job.get('labels', {}))
job_params['envs'] = cls._env_params_from_dict(job.get('envs', {}))
job_params['inputs'] = cls._input_file_params_from_dict(
job.get('inputs', {}), False)
job_params['input-recursives'] = cls._input_file_params_from_dict(
job.get('input-recursives', {}), True)
job_params['outputs'] = cls._output_file_params_from_dict(
job.get('outputs', {}), False)
job_params['output-recursives'] = cls._output_file_params_from_dict(
job.get('output-recursives', {}), True)
job_params['mounts'] = cls._mount_params_from_dict(job.get('mounts', {}))
last_char = age[-1]
if last_char == 's':
return from_time - datetime.timedelta(seconds=int(age[:-1]))
elif last_char == 'm':
return from_time - datetime.timedelta(minutes=int(age[:-1]))
elif last_char == 'h':
return from_time - datetime.timedelta(hours=int(age[:-1]))
elif last_char == 'd':
return from_time - datetime.timedelta(days=int(age[:-1]))
elif last_char == 'w':
return from_time - datetime.timedelta(weeks=int(age[:-1]))
else:
# If no unit is given treat the age as seconds from epoch, otherwise apply
# the correct time unit.
return dsub_util.replace_timezone(
datetime.datetime.utcfromtimestamp(int(age)), pytz.utc)
except (ValueError, OverflowError) as e:
raise ValueError('Unable to parse age string %s: %s' % (age, e))
job_params['input-recursives'] = cls._input_file_params_from_dict(
job.get('input-recursives', {}), True)
job_params['outputs'] = cls._output_file_params_from_dict(
job.get('outputs', {}), False)
job_params['output-recursives'] = cls._output_file_params_from_dict(
job.get('output-recursives', {}), True)
job_params['mounts'] = cls._mount_params_from_dict(job.get('mounts', {}))
task_descriptors = []
for task in job.get('tasks', []):
task_metadata = {'task-id': task.get('task-id')}
# Old instances of the meta.yaml do not have a task create time.
create_time = task.get('create-time')
if create_time:
task_metadata['create-time'] = dsub_util.replace_timezone(
create_time, pytz.utc)
if task.get('task-attempt') is not None:
task_metadata['task-attempt'] = task.get('task-attempt')
task_params = {}
task_params['labels'] = cls._label_params_from_dict(
task.get('labels', {}))
task_params['envs'] = cls._env_params_from_dict(task.get('envs', {}))
task_params['inputs'] = cls._input_file_params_from_dict(
task.get('inputs', {}), False)
task_params['input-recursives'] = cls._input_file_params_from_dict(
task.get('input-recursives', {}), True)
task_params['outputs'] = cls._output_file_params_from_dict(
task.get('outputs', {}), False)
task_params['output-recursives'] = cls._output_file_params_from_dict(
def _datetime_to_utc_int(date):
"""Convert the integer UTC time value into a local datetime."""
if date is None:
return None
# Convert localized datetime to a UTC integer
epoch = dsub_util.replace_timezone(datetime.utcfromtimestamp(0), pytz.utc)
return (date - epoch).total_seconds()