Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self, terms, variables, **kwargs):
self.set_options(direct=kwargs)
credentials = {}
credentials['azure_client_id'] = self.get_option('azure_client_id', None)
credentials['azure_secret'] = self.get_option('azure_secret', None)
credentials['azure_tenant'] = self.get_option('azure_tenant', 'common')
if credentials['azure_client_id'] is None or credentials['azure_secret'] is None:
raise AnsibleError("Must specify azure_client_id and azure_secret")
_cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD
if self.get_option('azure_cloud_environment', None) is not None:
cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(credentials['azure_cloud_environment'])
try:
azure_credentials = ServicePrincipalCredentials(client_id=credentials['azure_client_id'],
secret=credentials['azure_secret'],
tenant=credentials['azure_tenant'],
resource=_cloud_environment.endpoints.active_directory_graph_resource_id)
client = GraphRbacManagementClient(azure_credentials, credentials['azure_tenant'],
base_url=_cloud_environment.endpoints.active_directory_graph_resource_id)
response = list(client.service_principals.list(filter="appId eq '{0}'".format(credentials['azure_client_id'])))
sp = response[0]
return sp.object_id.split(',')
self._adfs_authority_url = None
self._resource = None
self.debug = False
if args.debug:
self.debug = True
self.credentials = self._get_credentials(args)
if not self.credentials:
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"or define a profile in ~/.azure/credentials.")
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message))
def convert_cloud_type(cloud_type: str) -> msrestazure.azure_cloud.Cloud:
"""Convert clout type string to object
:param cloud_type: cloud type to convert
:return: cloud object
"""
if cloud_type == 'public':
cloud = msrestazure.azure_cloud.AZURE_PUBLIC_CLOUD
elif cloud_type == 'china':
cloud = msrestazure.azure_cloud.AZURE_CHINA_CLOUD
elif cloud_type == 'germany':
cloud = msrestazure.azure_cloud.AZURE_GERMAN_CLOUD
elif cloud_type == 'usgov':
cloud = msrestazure.azure_cloud.AZURE_US_GOV_CLOUD
else:
raise ValueError('unknown cloud_type: {}'.format(cloud_type))
return cloud
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"define a profile in ~/.azure/credentials, or install Azure CLI and log in (`az login`).")
# cert validation mode precedence: module-arg, credential profile, env, "validate"
self._cert_validation_mode = cert_validation_mode or self.credentials.get('cert_validation_mode') or \
os.environ.get('AZURE_CERT_VALIDATION_MODE') or 'validate'
if self._cert_validation_mode not in ['validate', 'ignore']:
self.fail('invalid cert_validation_mode: {0}'.format(self._cert_validation_mode))
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if self.credentials.get('credentials') is not None and raw_cloud_env is not None:
self._cloud_environment = raw_cloud_env
elif not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message), exception=traceback.format_exc())
self._adfs_authority_url = None
self._resource = None
self.debug = False
if args.debug:
self.debug = True
self.credentials = self._get_credentials(args)
if not self.credentials:
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"or define a profile in ~/.azure/credentials.")
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message))
self._resource_client = None
self._network_client = None
self.debug = False
if args.debug:
self.debug = True
self.credentials = self._get_credentials(args)
if not self.credentials:
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"or define a profile in ~/.azure/credentials.")
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message))
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.resource.resources.models import DeploymentMode
from msrestazure.azure_cloud import AZURE_US_GOV_CLOUD
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
import os
if os.environ.get("is_gov") == "1":
mycloud = AZURE_US_GOV_CLOUD
else:
mycloud = AZURE_PUBLIC_CLOUD
from optparse import OptionParser
parser = OptionParser()
parser.add_option('--action',help="external|internal|complete")
parser.add_option('--debug',action="store_true")
parser.add_option('--private',action="store_true")
(options, args) = parser.parse_args()
import os
import pprint
import re
import sys
import json
import time
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"define a profile in ~/.azure/credentials, or install Azure CLI and log in (`az login`).")
# cert validation mode precedence: module-arg, credential profile, env, "validate"
self._cert_validation_mode = cert_validation_mode or self.credentials.get('cert_validation_mode') or \
os.environ.get('AZURE_CERT_VALIDATION_MODE') or 'validate'
if self._cert_validation_mode not in ['validate', 'ignore']:
self.fail('invalid cert_validation_mode: {0}'.format(self._cert_validation_mode))
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if self.credentials.get('credentials') is not None and raw_cloud_env is not None:
self._cloud_environment = raw_cloud_env
elif not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message), exception=traceback.format_exc())
self._adfs_authority_url = None
self._resource = None
self.debug = False
if args.debug:
self.debug = True
self.credentials = self._get_credentials(args)
if not self.credentials:
self.fail("Failed to get credentials. Either pass as parameters, set environment variables, "
"or define a profile in ~/.azure/credentials.")
# if cloud_environment specified, look up/build Cloud object
raw_cloud_env = self.credentials.get('cloud_environment')
if not raw_cloud_env:
self._cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD # SDK default
else:
# try to look up "well-known" values via the name attribute on azure_cloud members
all_clouds = [x[1] for x in inspect.getmembers(azure_cloud) if isinstance(x[1], azure_cloud.Cloud)]
matched_clouds = [x for x in all_clouds if x.name == raw_cloud_env]
if len(matched_clouds) == 1:
self._cloud_environment = matched_clouds[0]
elif len(matched_clouds) > 1:
self.fail("Azure SDK failure: more than one cloud matched for cloud_environment name '{0}'".format(raw_cloud_env))
else:
if not urlparse.urlparse(raw_cloud_env).scheme:
self.fail("cloud_environment must be an endpoint discovery URL or one of {0}".format([x.name for x in all_clouds]))
try:
self._cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(raw_cloud_env)
except Exception as e:
self.fail("cloud_environment {0} could not be resolved: {1}".format(raw_cloud_env, e.message))