Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_access_group(self):
"""
Update the Access Group if the access_group already exists
"""
try:
self.sfe.modify_volume_access_group(volume_access_group_id=self.group_id,
virtual_network_id=self.virtual_network_id,
virtual_network_tags=self.virtual_network_tags,
initiators=self.initiators,
volumes=self.volumes,
attributes=self.attributes)
except Exception as e:
self.module.fail_json(msg="Error updating volume access group %s: %s" %
(self.access_group_name, to_native(e)), exception=traceback.format_exc())
def query_metadata(metadata_url, headers=None, expect_json=False):
""" Return metadata from the provided metadata_url
Args:
metadata_url (str): metadata url
headers (dict): headers to set for metadata request
expect_json (bool): does the metadata_url return json
Returns:
dict or list: metadata request result
"""
result, info = fetch_url(module, metadata_url, headers=headers) # noqa: F405
if info['status'] != 200:
raise OpenShiftFactsMetadataUnavailableError("Metadata unavailable")
if expect_json:
return module.from_json(to_native(result.read())) # noqa: F405
else:
return [to_native(line.strip()) for line in result.readlines()]
if parent_type == 'folder':
# Move ESXi host from folder to folder
task = self.folder.MoveIntoFolder_Task([self.host_update.parent])
elif parent_type == 'cluster':
self.put_host_in_maintenance_mode(self.host_update)
# Move ESXi host from cluster to folder
task = self.folder.MoveIntoFolder_Task([self.host_update])
except vim.fault.DuplicateName as duplicate_name:
self.module.fail_json(
msg="The folder already contains an object with the specified name : %s" %
to_native(duplicate_name)
)
except vim.fault.InvalidFolder as invalid_folder:
self.module.fail_json(
msg="The parent of this folder is in the list of objects : %s" %
to_native(invalid_folder)
)
except vim.fault.InvalidState as invalid_state:
self.module.fail_json(
msg="Failed to move host, this can be due to either of following :"
" 1. The host is not part of the same datacenter, 2. The host is not in maintenance mode : %s" %
to_native(invalid_state)
)
except vmodl.fault.NotSupported as not_supported:
self.module.fail_json(
msg="The target folder is not a host folder : %s" %
to_native(not_supported)
)
except vim.fault.DisallowedOperationOnFailoverHost as failover_host:
self.module.fail_json(
msg="The host is configured as a failover host : %s" %
to_native(failover_host)
def _set_server_attributes(self, server):
self.inventory.set_variable(server.name, "id", to_native(server.id))
self.inventory.set_variable(server.name, "name", to_native(server.name))
self.inventory.set_variable(server.name, "status", to_native(server.status))
self.inventory.set_variable(server.name, "type", to_native(server.server_type.name))
# Network
self.inventory.set_variable(server.name, "ipv4", to_native(server.public_net.ipv4.ip))
self.inventory.set_variable(server.name, "ipv6_network", to_native(server.public_net.ipv6.network))
self.inventory.set_variable(server.name, "ipv6_network_mask", to_native(server.public_net.ipv6.network_mask))
if self.get_option("connect_with") == "public_ipv4":
self.inventory.set_variable(server.name, "ansible_host", to_native(server.public_net.ipv4.ip))
elif self.get_option("connect_with") == "hostname":
self.inventory.set_variable(server.name, "ansible_host", to_native(server.name))
elif self.get_option("connect_with") == "ipv4_dns_ptr":
self.inventory.set_variable(server.name, "ansible_host", to_native(server.public_net.ipv4.dns_ptr))
# Server Type
self.inventory.set_variable(server.name, "server_type", to_native(server.image.name))
# Datacenter
self.inventory.set_variable(server.name, "datacenter", to_native(server.datacenter.name))
self.inventory.set_variable(server.name, "location", to_native(server.datacenter.location.name))
def get_id_from_display_name(module, manager_url, mgr_username, mgr_password, validate_certs, endpoint, display_name, exit_if_not_found=True):
try:
(rc, resp) = request(manager_url+ endpoint, headers=dict(Accept='application/json'),
url_username=mgr_username, url_password=mgr_password, validate_certs=validate_certs, ignore_errors=True)
except Exception as err:
module.fail_json(msg='Error accessing id for display name %s. Error [%s]' % (display_name, to_native(err)))
for result in resp['results']:
if result.__contains__('display_name') and result['display_name'] == display_name:
return result['id']
if exit_if_not_found:
module.fail_json(msg='No id exist with display name %s' % display_name)
def ontapi(self):
'''Method to get ontapi version'''
api = 'system-get-ontapi-version'
api_call = netapp_utils.zapi.NaElement(api)
try:
results = self.server.invoke_successfully(api_call, enable_tunneling=False)
ontapi_version = results.get_child_content('minor-version')
return ontapi_version if ontapi_version is not None else '0'
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg="Error calling API %s: %s" %
(api, to_native(error)), exception=traceback.format_exc())
display.debug("done running command with Popen()")
if self.become and self.become.expect_prompt() and sudoable:
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK)
fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK)
selector = selectors.DefaultSelector()
selector.register(p.stdout, selectors.EVENT_READ)
selector.register(p.stderr, selectors.EVENT_READ)
become_output = b''
try:
while not self.become.check_success(become_output) and not self.become.check_password_prompt(become_output):
events = selector.select(self._play_context.timeout)
if not events:
stdout, stderr = p.communicate()
raise AnsibleError('timeout waiting for privilege escalation password prompt:\n' + to_native(become_output))
for key, event in events:
if key.fileobj == p.stdout:
chunk = p.stdout.read()
elif key.fileobj == p.stderr:
chunk = p.stderr.read()
if not chunk:
stdout, stderr = p.communicate()
raise AnsibleError('privilege output closed while waiting for password prompt:\n' + to_native(become_output))
become_output += chunk
finally:
selector.close()
if not self.become.check_success(become_output):
p.stdin.write(to_bytes(self._play_context.become_pass, errors='surrogate_or_strict') + b'\n')
def set_playbook_paths(self, b_playbook_paths):
if isinstance(b_playbook_paths, string_types):
b_playbook_paths = [b_playbook_paths]
# track visited paths; we have to preserve the dir order as-passed in case there are duplicate collections (first one wins)
added_paths = set()
# de-dupe and ensure the paths are native strings (Python seems to do this for package paths etc, so assume it's safe)
self._n_playbook_paths = [os.path.join(to_native(p), 'collections') for p in b_playbook_paths if not (p in added_paths or added_paths.add(p))]
# FIXME: only allow setting this once, or handle any necessary cache/package path invalidations internally?
if fmt == 'zip':
arcfile = zipfile.ZipFile(
to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'),
'w',
zipfile.ZIP_DEFLATED,
True
)
arcfile.write(
to_native(b_path, errors='surrogate_or_strict', encoding='ascii'),
to_native(b_path[len(b_arcroot):], errors='surrogate_or_strict')
)
arcfile.close()
state = 'archive' # because all zip files are archives
elif fmt == 'tar':
arcfile = tarfile.open(to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'), 'w')
arcfile.add(to_native(b_path, errors='surrogate_or_strict', encoding='ascii'))
arcfile.close()
else:
f_in = open(b_path, 'rb')
n_dest = to_native(b_dest, errors='surrogate_or_strict', encoding='ascii')
if fmt == 'gz':
f_out = gzip.open(n_dest, 'wb')
elif fmt == 'bz2':
f_out = bz2.BZ2File(n_dest, 'wb')
elif fmt == 'xz':
f_out = lzma.LZMAFile(n_dest, 'wb')
else:
raise OSError("Invalid format")
shutil.copyfileobj(f_in, f_out)
def is_pubkey(string):
"""Verifies if string is a pubkey"""
pgp_regex = ".*?(-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----).*"
return bool(re.match(pgp_regex, to_native(string, errors='surrogate_or_strict'), re.DOTALL))