Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self, terms, variables, **kwargs):
self.set_options(direct=kwargs)
credentials = {}
credentials['azure_client_id'] = self.get_option('azure_client_id', None)
credentials['azure_secret'] = self.get_option('azure_secret', None)
credentials['azure_tenant'] = self.get_option('azure_tenant', 'common')
if credentials['azure_client_id'] is None or credentials['azure_secret'] is None:
raise AnsibleError("Must specify azure_client_id and azure_secret")
_cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD
if self.get_option('azure_cloud_environment', None) is not None:
cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(credentials['azure_cloud_environment'])
try:
azure_credentials = ServicePrincipalCredentials(client_id=credentials['azure_client_id'],
secret=credentials['azure_secret'],
tenant=credentials['azure_tenant'],
resource=_cloud_environment.endpoints.active_directory_graph_resource_id)
client = GraphRbacManagementClient(azure_credentials, credentials['azure_tenant'],
base_url=_cloud_environment.endpoints.active_directory_graph_resource_id)
response = list(client.service_principals.list(filter="appId eq '{0}'".format(credentials['azure_client_id'])))
sp = response[0]
return sp.object_id.split(',')
except CloudError as ex:
raise AnsibleError("Failed to get service principal object id: %s" % to_native(ex))
def run(self, terms, variables, **kwargs):
self.set_options(direct=kwargs)
credentials = {}
credentials['azure_client_id'] = self.get_option('azure_client_id', None)
credentials['azure_secret'] = self.get_option('azure_secret', None)
credentials['azure_tenant'] = self.get_option('azure_tenant', 'common')
if credentials['azure_client_id'] is None or credentials['azure_secret'] is None:
raise AnsibleError("Must specify azure_client_id and azure_secret")
_cloud_environment = azure_cloud.AZURE_PUBLIC_CLOUD
if self.get_option('azure_cloud_environment', None) is not None:
cloud_environment = azure_cloud.get_cloud_from_metadata_endpoint(credentials['azure_cloud_environment'])
try:
azure_credentials = ServicePrincipalCredentials(client_id=credentials['azure_client_id'],
secret=credentials['azure_secret'],
tenant=credentials['azure_tenant'],
resource=_cloud_environment.endpoints.active_directory_graph_resource_id)
client = GraphRbacManagementClient(azure_credentials, credentials['azure_tenant'],
base_url=_cloud_environment.endpoints.active_directory_graph_resource_id)
response = list(client.service_principals.list(filter="appId eq '{0}'".format(credentials['azure_client_id'])))
sp = response[0]
return sp.object_id.split(',')
def get_poller_result(poller, wait=5):
try:
delay = wait
n = 0
while not poller.done():
n += 1
print(
"\r\t=> Current status: {}, waiting for {} sec{}".format(poller.status(), delay, n * '.'),
end='', flush=True,
)
poller.wait(timeout=delay)
print()
return poller.result()
except azure_exceptions.CloudError as e:
logger.error(str(e))
raise e
def get_long_running_output(response):
if response.status_code not in [202]:
exp = CloudError(response)
exp.request_id = response.headers.get('x-ms-request-id')
raise exp
if raw:
client_raw_response = ClientRawResponse(None, response)
client_raw_response.add_headers({
'Location': 'str',
'Retry-After': 'int',
})
return client_raw_response
client_raw_response = ClientRawResponse(None, response)
client_raw_response.add_headers({
'Azure-AsyncOperation': 'str',
'Location': 'str',
'Retry-After': 'int',
})
return client_raw_response
if raw:
response = long_running_send()
return get_long_running_output(response)
long_running_operation_timeout = operation_config.get(
'long_running_operation_timeout',
self.config.long_running_operation_timeout)
return AzureOperationPoller(
long_running_send, get_long_running_output,
get_long_running_status, long_running_operation_timeout)
if raw:
client_raw_response = ClientRawResponse(None, response)
client_raw_response.add_headers({
'Location': 'str',
'Retry-After': 'int',
})
return client_raw_response
if raw:
response = long_running_send()
return get_long_running_output(response)
long_running_operation_timeout = operation_config.get(
'long_running_operation_timeout',
self.config.long_running_operation_timeout)
return AzureOperationPoller(
long_running_send, get_long_running_output,
get_long_running_status, long_running_operation_timeout)
exp = CloudError(response)
exp.request_id = response.headers.get('x-ms-request-id')
raise exp
if raw:
client_raw_response = ClientRawResponse(None, response)
return client_raw_response
if raw:
response = long_running_send()
return get_long_running_output(response)
long_running_operation_timeout = operation_config.get(
'long_running_operation_timeout',
self.config.long_running_operation_timeout)
return AzureOperationPoller(
long_running_send, get_long_running_output,
get_long_running_status, long_running_operation_timeout)
deserialized = self._deserialize('Product', response)
if raw:
client_raw_response = ClientRawResponse(deserialized, response)
return client_raw_response
return deserialized
if raw:
response = long_running_send()
return get_long_running_output(response)
long_running_operation_timeout = operation_config.get(
'long_running_operation_timeout',
self.config.long_running_operation_timeout)
return AzureOperationPoller(
long_running_send, get_long_running_output,
get_long_running_status, long_running_operation_timeout)
def get_key_vault_client():
return KeyVaultClient(ServicePrincipalCredentials(
client_id=os.environ.get("CLIENT_ID"),
secret=os.environ.get("CLIENT_SECRET"),
tenant=os.environ.get("TENANT_ID"),
resource='https://vault.azure.net'
))
def _determine_auth(**kwargs):
'''
Acquire Azure ARM Credentials
'''
if 'profile' in kwargs:
azure_credentials = __salt__['config.option'](kwargs['profile'])
kwargs.update(azure_credentials)
service_principal_creds_kwargs = ['client_id', 'secret', 'tenant']
user_pass_creds_kwargs = ['username', 'password']
try:
if kwargs.get('cloud_environment') and kwargs.get('cloud_environment').startswith('http'):
cloud_env = get_cloud_from_metadata_endpoint(kwargs['cloud_environment'])
else:
cloud_env_module = importlib.import_module('msrestazure.azure_cloud')
cloud_env = getattr(cloud_env_module, kwargs.get('cloud_environment', 'AZURE_PUBLIC_CLOUD'))
except (AttributeError, ImportError, MetadataEndpointError):
raise sys.exit('The Azure cloud environment {0} is not available.'.format(kwargs['cloud_environment']))
if set(service_principal_creds_kwargs).issubset(kwargs):
if not (kwargs['client_id'] and kwargs['secret'] and kwargs['tenant']):
raise SaltInvocationError(
'The client_id, secret, and tenant parameters must all be '
'populated if using service principals.'
)
else:
credentials = ServicePrincipalCredentials(kwargs['client_id'],
kwargs['secret'],
tenant=kwargs['tenant'],