Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cmd = replace_gnu_args(self.cmd, **format_kwargs)
if '$WID' in cmd or (self.args and '$WID' in self.args):
msg = "Using $WID in the command is deprecated. You should use "\
"the python string format instead. In you case, this means "\
"replacing the $WID in your command by $(WID)."
warnings.warn(msg, DeprecationWarning)
self.cmd = cmd.replace('$WID', str(self.wid))
if self.args is not None:
if isinstance(self.args, string_types):
args = shlex.split(bytestring(replace_gnu_args(self.args,
**format_kwargs)))
else:
args = [bytestring(replace_gnu_args(arg, **format_kwargs))\
for arg in self.args]
args = shlex.split(bytestring(cmd)) + args
else:
args = shlex.split(bytestring(cmd))
logger.debug("process args: %s", args)
return args
and hasattr(self.watcher, option):
format_kwargs[option] = getattr(self.watcher, option)
cmd = replace_gnu_args(self.cmd, **format_kwargs)
if '$WID' in cmd or (self.args and '$WID' in self.args):
msg = "Using $WID in the command is deprecated. You should use "\
"the python string format instead. In you case, this means "\
"replacing the $WID in your command by $(WID)."
warnings.warn(msg, DeprecationWarning)
self.cmd = cmd.replace('$WID', str(self.wid))
if self.args is not None:
if isinstance(self.args, string_types):
args = shlex.split(bytestring(replace_gnu_args(self.args,
**format_kwargs)))
else:
args = [bytestring(replace_gnu_args(arg, **format_kwargs))\
for arg in self.args]
args = shlex.split(bytestring(cmd)) + args
else:
args = shlex.split(bytestring(cmd))
logger.debug("process args: %s", args)
return args
current_env = ObjectDict(self.env.copy())
format_kwargs = {
'wid': self.wid, 'shell': self.shell, 'args': self.args,
'env': current_env, 'working_dir': self.working_dir,
'uid': self.uid, 'gid': self.gid, 'rlimits': self.rlimits,
'executable': self.executable, 'use_fds': self.use_fds}
if self.watcher is not None:
for option in self.watcher.optnames:
if option not in format_kwargs\
and hasattr(self.watcher, option):
format_kwargs[option] = getattr(self.watcher, option)
cmd = replace_gnu_args(self.cmd, **format_kwargs)
if '$WID' in cmd or (self.args and '$WID' in self.args):
msg = "Using $WID in the command is deprecated. You should use "\
"the python string format instead. In you case, this means "\
"replacing the $WID in your command by $(WID)."
warnings.warn(msg, DeprecationWarning)
self.cmd = cmd.replace('$WID', str(self.wid))
if self.args is not None:
if isinstance(self.args, string_types):
args = shlex.split(bytestring(replace_gnu_args(self.args,
**format_kwargs)))
else:
args = [bytestring(replace_gnu_args(arg, **format_kwargs))\
for arg in self.args]
def spawn_process(self, recovery_wid=None):
"""Spawn process.
Return True if ok, False if the watcher must be stopped
"""
if self.is_stopped():
return True
if not recovery_wid and not self.call_hook('before_spawn'):
return False
cmd = util.replace_gnu_args(self.cmd, env=self.env)
nb_tries = 0
# start the redirector now so we can catch any startup errors
if self.stream_redirector:
self.stream_redirector.start()
while nb_tries < self.max_retry or self.max_retry == -1:
process = None
pipe_stdout = self.stdout_stream is not None
pipe_stderr = self.stderr_stream is not None
# noinspection PyPep8Naming
ProcCls = self._process_class
try:
process = ProcCls(self.name, recovery_wid or self._nextwid,
cmd, args=self.args,
def spawn_process(self):
"""Spawn process.
"""
if self.stopped:
return
if not self.call_hook('before_spawn'):
self.stopped = True
return False
cmd = util.replace_gnu_args(self.cmd, sockets=self._get_sockets_fds(),
env=self.env)
self._process_counter += 1
nb_tries = 0
pipe_stdout = self.stdout_redirector is not None
pipe_stderr = self.stderr_redirector is not None
while nb_tries < self.max_retry or self.max_retry == -1:
process = None
try:
process = Process(self._process_counter, cmd,
args=self.args, working_dir=self.working_dir,
shell=self.shell, uid=self.uid, gid=self.gid,
env=self.env, rlimits=self.rlimits,
executable=self.executable,
use_fds=self.use_sockets, watcher=self,
pipe_stdout=pipe_stdout,
def get(self, section, option):
res = StrictConfigParser.get(self, section, option)
return replace_gnu_args(res, env=self._env)
def items(self, section, noreplace=False):
items = StrictConfigParser.items(self, section)
if noreplace:
return items
env = dict(os.environ)
return [(key, replace_gnu_args(value, env=env))
for key, value in items]
watcher_name = watcher['name']
extra = cfg.items(section, noreplace=True)
environs[watcher_name].update(_upper(extra))
for watcher in watchers:
if watcher['name'] in environs:
if not 'env' in watcher:
watcher['env'] = dict()
_extend(watcher['env'], environs[watcher['name']].items())
for option, value in watcher.items():
if option in ('name', 'env'):
continue
if not isinstance(value, str):
continue
watcher[option] = replace_gnu_args(value, env=watcher['env'])
config['watchers'] = watchers
config['plugins'] = plugins
config['sockets'] = sockets
return config