Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# If path is absolute, use that. Else if it is a tilde expansion, throw an error.
# Otherwise, use sandbox as relative root.
normalized_input_path = os.path.normpath(path)
if (os.path.isabs(normalized_input_path)):
final_path = normalized_input_path
elif (normalized_input_path.startswith('~/') or normalized_input_path == '~'):
raise context.CommandError(EXIT_INVALID_PARAMETER, ScpCommand.TILDE_USAGE_ERROR_MSG % path)
else:
sandbox_path_pre_format = DistributedCommandRunner.thermos_sandbox(
api.cluster,
executor_sandbox=context.options.executor_sandbox)
thermos_namespace = ThermosContext(
task_id=assigned.taskId,
ports=assigned.assignedPorts)
sandbox_path = String(sandbox_path_pre_format) % Environment(thermos=thermos_namespace)
# Join the individual folders to the sandbox path to build safely
final_path = os.path.join(str(sandbox_path), *normalized_input_path.split(os.sep))
return '%s@%s:%s' % (role, slave_host, final_path)
def interpolate_cmd(task, cmd):
"""
:param task: Assigned task passed from Mesos Agent
:param cmd: Command defined inside shell_command inside config.
:return: Interpolated cmd with filled in values, for example ports.
"""
thermos_namespace = ThermosContext(
task_id=task.taskId,
ports=task.assignedPorts)
mesos_namespace = MesosContext(instance=task.instanceId)
command = String(cmd) % Environment(
thermos=thermos_namespace,
mesos=mesos_namespace
)
return command.get()
def substitute(cls, command, task, cluster, **kw):
prefix_command = 'cd %s;' % cls.thermos_sandbox(cluster, **kw)
thermos_namespace = ThermosContext(
task_id=task.assignedTask.taskId,
ports=task.assignedTask.assignedPorts)
mesos_namespace = MesosContext(instance=task.assignedTask.instanceId)
command = String(prefix_command + command) % Environment(
thermos=thermos_namespace,
mesos=mesos_namespace)
return command.get()
def job(self):
interpolated_job = self._job % self.context()
# Typecheck against the Job with a dummy {{mesos.instance}} populated. It is the only free
# variable that gets unwrapped at the Task level.
typecheck = interpolated_job.bind(Environment(mesos=Environment(instance=0))).check()
if not typecheck.ok():
raise self.InvalidConfig(typecheck.message())
return convert_pystachio_to_thrift(interpolated_job, self._packages)
def context(self, instance=None):
context = dict(instance=instance)
# Filter unspecified values
return Environment(mesos=MesosContext(dict((k, v) for k, v in context.items() if v)))
def _read_task(self, memoized={}):
"""Read the corresponding task from disk and return a ThermosTask. Memoizes already-read tasks.
"""
if self._task_id not in memoized:
path = self._pathspec.given(state=self.type).getpath('task_path')
if os.path.exists(path):
task = ThermosTaskWrapper.from_file(path)
if task is None:
log.error('Error reading ThermosTask from %s in observer.', path)
else:
context = self.context(self._task_id)
if not context:
log.warning('Task not yet available: %s', self._task_id)
task = task.task() % Environment(thermos=context)
memoized[self._task_id] = task
return memoized.get(self._task_id, None)
def context(self, instance=None):
context = dict(
role=self.role(),
cluster=self.cluster(),
instance=instance
)
# Filter unspecified values
return Environment(mesos = MesosContext(dict((k,v) for k,v in context.items() if v)))