Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def stat(node, path):
if node.os in node.OS_FAMILY_BSD:
result = node.run(
"stat -f '%Su:%Sg:%p:%z:%HT' -- {}".format(quote(path)),
may_fail=True,
)
else:
result = node.run(
"stat -c '%U:%G:%a:%s:%F' -- {}".format(quote(path)),
may_fail=True,
)
if result.return_code != 0:
return {}
owner, group, mode, size, ftype = \
force_text(result.stdout).strip().split(":", 5)
mode = mode[-4:].zfill(4) # cut off BSD file type
file_stat = {
'owner': owner,
'group': group,
'mode': mode,
'size': int(size),
'type': ftype.lower(),
}
io.debug(_("stat for '{path}' on {node}: {result}".format(
node=node.name,
path=path,
result=repr(file_stat),
)))
return file_stat
def get_role(node, role):
result = node.run("echo \"SELECT rolcanlogin, rolsuper, rolpassword from pg_authid "
"WHERE rolname='{}'\" "
"| sudo -u postgres psql -Anqwx -F '|'".format(role))
role_attrs = {}
for line in force_text(result.stdout).strip().split("\n"):
try:
key, value = line.split("|")
except ValueError:
pass
else:
role_attrs[AUTHID_COLUMNS[key]] = value
for bool_attr in ('can_login', 'superuser'):
if bool_attr in role_attrs:
role_attrs[bool_attr] = role_attrs[bool_attr] == "t"
return role_attrs if role_attrs else None
def get_role(node, role):
result = node.run("echo \"SELECT rolcanlogin, rolsuper, rolpassword from pg_authid "
"WHERE rolname='{}'\" "
"| sudo -u postgres psql -Anqwx -F '|'".format(role),
may_fail=True)
if result.return_code != 0:
return None
role_attrs = {}
for line in force_text(result.stdout).strip().split("\n"):
try:
key, value = line.split("|")
except ValueError:
pass
else:
role_attrs[AUTHID_COLUMNS[key]] = value
for bool_attr in ('can_login', 'superuser'):
if bool_attr in role_attrs:
role_attrs[bool_attr] = role_attrs[bool_attr] == "t"
return role_attrs if role_attrs else None
@cached_property
def stderr_text(self):
return force_text(self.stderr)
@cached_property
def stdout_text(self):
return force_text(self.stdout)
if not skip_name_validation:
self._validate_name(bundle, name)
self.validate_name(bundle, name)
self._validate_attribute_names(bundle, self.id, attributes)
self._validate_required_attributes(bundle, self.id, attributes)
self.validate_attributes(bundle, self.id, attributes)
try:
attributes = self.patch_attributes(attributes)
except FaultUnavailable:
self._faults_missing_for_attributes.add(_("unknown"))
for attribute_name, attribute_default in BUILTIN_ITEM_ATTRIBUTES.items():
normalize = make_normalize(attribute_default)
try:
setattr(self, attribute_name, force_text(normalize(attributes.get(
attribute_name,
copy(attribute_default),
))))
except FaultUnavailable:
self._faults_missing_for_attributes.add(attribute_name)
setattr(self, attribute_name, BUILTIN_ITEM_ATTRIBUTES[attribute_name])
for attribute_name, attribute_default in self.ITEM_ATTRIBUTES.items():
if attribute_name not in BUILTIN_ITEM_ATTRIBUTES:
normalize = make_normalize(attribute_default)
try:
self.attributes[attribute_name] = force_text(normalize(attributes.get(
attribute_name,
copy(attribute_default),
)))
except FaultUnavailable:
def symlink_target(self):
if not self.is_symlink:
raise ValueError("{} is not a symlink".format(quote(self.path)))
return force_text(self.node.run(
"readlink -- {}".format(quote(self.path)), may_fail=True,
).stdout.strip())
io.debug("command finished with return code {}".format(ssh_process.returncode))
result = RunResult()
result.stdout = stdout_lb.record.getvalue()
result.stderr = stderr_lb.record.getvalue()
result.return_code = ssh_process.returncode
if result.return_code != 0 and (not ignore_failure or result.return_code == 255):
raise RemoteException(_(
"Non-zero return code ({rcode}) running '{command}' on '{host}':\n\n{result}"
).format(
command=command,
host=hostname,
rcode=result.return_code,
result=force_text(result.stdout) + force_text(result.stderr),
))
return result
result = RunResult()
result.stdout = stdout_lb.record.getvalue()
result.stderr = stderr_lb.record.getvalue()
result.return_code = ssh_process.returncode
if result.return_code != 0:
error_msg = _(
"Non-zero return code ({rcode}) running '{command}' "
"with ID {id} on '{host}':\n\n{result}\n\n"
).format(
command=command,
host=hostname,
id=cmd_id,
rcode=result.return_code,
result=force_text(result.stdout) + force_text(result.stderr),
)
io.debug(error_msg)
if not ignore_failure or result.return_code == 255:
raise RemoteException(error_msg)
return result
result = RunResult()
result.stderr = docker_process.stderr
result.stdout = docker_process.stdout
result.return_code = docker_process.returncode
if result.return_code != 0:
error_msg = _(
"Non-zero return code ({rcode}) running '{command}' "
"with ID {id} on '{host}':\n\n{result}\n\n"
).format(
command=command,
host=container_id,
id=cmd_id,
rcode=result.return_code,
result=force_text(result.stdout) + force_text(result.stderr),
)
io.debug(error_msg)
if not ignore_failure:
raise RemoteException(error_msg)
return result