Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrapped(self, *args, **kwargs):
import re
remaining_tries = int(C.ANSIBLE_SSH_RETRIES) + 1
cmd_summary = "%s..." % args[0]
for attempt in range(remaining_tries):
cmd = args[0]
if attempt != 0 and self._play_context.password and isinstance(cmd, list):
# If this is a retry, the fd/pipe for sshpass is closed, and we need a new one
self.sshpass_pipe = os.pipe()
cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')
try:
try:
##################################
# override to remove/intersept 'Done' of citrix adc
return_tuple = func(self, *args, **kwargs)
return_tuple = list(return_tuple)
subst = ""
regex = r"(\r\n|\r|\n|)( Done)(\r\n|\r|\n)+"
return_tuple[1] = re.sub(regex, subst, codecs.decode(return_tuple[1]), 0, re.UNICODE)
return_tuple = tuple(return_tuple)
# end
###############################
display.vvv(return_tuple, host=self.host)
# 0 = success
def banner_cowsay(self, msg, color=None):
if u": [" in msg:
msg = msg.replace(u"[", u"")
if msg.endswith(u"]"):
msg = msg[:-1]
runcmd = [self.b_cowsay, b"-W", b"60"]
if self.noncow:
thecow = self.noncow
if thecow == 'random':
thecow = random.choice(list(self.cows_available))
runcmd.append(b'-f')
runcmd.append(to_bytes(thecow))
runcmd.append(to_bytes(msg))
cmd = subprocess.Popen(runcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = cmd.communicate()
self.display(u"%s\n" % to_text(out), color=color)
def add_all_plugin_dirs(path):
''' add any existing plugin dirs in the path provided '''
b_path = to_bytes(path, errors='surrogate_or_strict')
if os.path.isdir(b_path):
for name, obj in get_all_plugin_loaders():
if obj.subdir:
plugin_path = os.path.join(b_path, to_bytes(obj.subdir))
if os.path.isdir(plugin_path):
obj.add_directory(to_text(plugin_path))
else:
display.warning("Ignoring invalid path provided to plugin path: '%s' is not a directory" % to_text(path))
def user_and_group(self, path, expand=True):
b_path = to_bytes(path, errors='surrogate_or_strict')
if expand:
b_path = os.path.expanduser(os.path.expandvars(b_path))
st = os.lstat(b_path)
uid = st.st_uid
gid = st.st_gid
return (uid, gid)
def fetch_file(self, in_path, out_path):
''' fetch a file from lxc to local '''
super(Connection, self).fetch_file(in_path, out_path)
self._display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.container_name)
in_path = to_bytes(in_path, errors='surrogate_or_strict')
out_path = to_bytes(out_path, errors='surrogate_or_strict')
try:
dst_file = open(out_path, "wb")
except IOError:
traceback.print_exc()
msg = "failed to open output file %s" % out_path
raise errors.AnsibleError(msg)
try:
def write_file(args):
try:
with open(in_path, 'rb') as src_file:
shutil.copyfileobj(src_file, dst_file)
finally:
# this is needed in the lxc child process
# to flush internal python buffers
dst_file.close()
def put_file(self, in_path, out_path):
''' transfer a file from local to lxc '''
super(Connection, self).put_file(in_path, out_path)
display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.lxc)
out_path = shlex_quote(self._prefix_login_path(out_path))
try:
with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
if not os.fstat(in_file.fileno()).st_size:
count = ' count=0'
else:
count = ''
try:
p = self._buffered_exec_command('dd of=%s bs=%s%s' % (out_path, BUFSIZE, count), stdin=in_file)
except OSError:
raise AnsibleError("chroot connection requires dd command in the chroot")
try:
stdout, stderr = p.communicate()
except Exception:
traceback.print_exc()
raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
if p.returncode != 0:
raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
except IOError:
def _open_files(self, task_uuid=None):
output_format = self._output_format
output_dir = self._output_dir
for feature in self._features:
data = {
b'counter': to_bytes(self._counter),
b'task_uuid': to_bytes(task_uuid),
b'feature': to_bytes(feature),
b'ext': to_bytes(output_format)
}
if self._files.get(feature):
try:
self._files[feature].close()
except Exception:
pass
if self.write_files:
filename = self._file_name_format % data
self._files[feature] = open(os.path.join(output_dir, filename), 'w+')
if output_format == b'csv':
def _buffered_exec_command(self, cmd, stdin=subprocess.PIPE):
''' run a command on the chroot. This is only needed for implementing
put_file() get_file() so that we don't have to read the whole file
into memory.
compared to exec_command() it looses some niceties like being able to
return the process's exit code immediately.
'''
executable = self.get_option('executable')
local_cmd = [self.chroot_cmd, self.chroot, executable, '-c', cmd]
display.vvv("EXEC %s" % (local_cmd), host=self.chroot)
local_cmd = [to_bytes(i, errors='surrogate_or_strict') for i in local_cmd]
p = subprocess.Popen(local_cmd, shell=False, stdin=stdin,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return p
:arg version: unicode vault version (for ex, '1.2'). Optional ('1.1' is default)
:arg vault_id: unicode vault identifier. If provided, the version will be bumped to 1.2.
:returns: a byte str that should be dumped into a file. It's
formatted to 80 char columns and has the header prepended
"""
if not cipher_name:
raise AnsibleError("the cipher must be set before adding a header")
version = version or '1.1'
# If we specify a vault_id, use format version 1.2. For no vault_id, stick to 1.1
if vault_id and vault_id != u'default':
version = '1.2'
b_version = to_bytes(version, 'utf-8', errors='strict')
b_vault_id = to_bytes(vault_id, 'utf-8', errors='strict')
b_cipher_name = to_bytes(cipher_name, 'utf-8', errors='strict')
header_parts = [b_HEADER,
b_version,
b_cipher_name]
if b_version == b'1.2' and b_vault_id:
header_parts.append(b_vault_id)
header = b';'.join(header_parts)
b_vaulttext = [header]
b_vaulttext += [b_ciphertext[i:i + 80] for i in range(0, len(b_ciphertext), 80)]
b_vaulttext += [b'']
b_vaulttext = b'\n'.join(b_vaulttext)
to_bytes('Contraseña'),
to_bytes('Contrasenya'),
to_bytes('Hasło'),
to_bytes('Heslo'),
to_bytes('Jelszó'),
to_bytes('Lösenord'),
to_bytes('Mật khẩu'),
to_bytes('Mot de passe'),
to_bytes('Parola'),
to_bytes('Parool'),
to_bytes('Pasahitza'),
to_bytes('Passord'),
to_bytes('Passwort'),
to_bytes('Salasana'),
to_bytes('Sandi'),
to_bytes('Senha'),
to_bytes('Wachtwoord'),
to_bytes('ססמה'),
to_bytes('Лозинка'),
to_bytes('Парола'),
to_bytes('Пароль'),
to_bytes('गुप्तशब्द'),
to_bytes('शब्दकूट'),
to_bytes('సంకేతపదము'),
to_bytes('හස්පදය'),
to_bytes('密码'),
to_bytes('密碼'),
to_bytes('口令'),
]
TASK_ATTRIBUTE_OVERRIDES = (
'become',