Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_keyvault_client(self):
try:
self.log("Get KeyVaultClient from MSI")
credentials = MSIAuthentication(resource='https://vault.azure.net')
return KeyVaultClient(credentials)
except Exception:
self.log("Get KeyVaultClient from service principal")
# Create KeyVault Client using KeyVault auth class and auth_callback
def auth_callback(server, resource, scope):
if self.credentials['client_id'] is None or self.credentials['secret'] is None:
self.fail('Please specify client_id, secret and tenant to access azure Key Vault.')
tenant = self.credentials.get('tenant')
if not self.credentials['tenant']:
tenant = "common"
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
def get_keyvault_client(self):
try:
self.log("Get KeyVaultClient from MSI")
credentials = MSIAuthentication(resource='https://vault.azure.net')
return KeyVaultClient(credentials)
except:
self.log("Get KeyVaultClient from service principal")
# Create KeyVault Client using KeyVault auth class and auth_callback
def auth_callback(server, resource, scope):
if self.credentials['client_id'] is None or self.credentials['secret'] is None:
self.fail('Please specify client_id, secret and tenant to access azure Key Vault.')
tenant = self.credentials.get('tenant')
if not self.credentials['tenant']:
tenant = "common"
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
def _get_msi_credentials(self, subscription_id_param=None, **kwargs):
client_id = kwargs.get('client_id', None)
credentials = MSIAuthentication(client_id=client_id)
subscription_id = subscription_id_param or os.environ.get(AZURE_CREDENTIAL_ENV_MAPPING['subscription_id'], None)
if not subscription_id:
try:
# use the first subscription of the MSI
subscription_client = SubscriptionClient(credentials)
subscription = next(subscription_client.subscriptions.list())
subscription_id = str(subscription.subscription_id)
except Exception as exc:
self.fail("Failed to get MSI token: {0}. "
"Please check whether your machine enabled MSI or grant access to any subscription.".format(str(exc)))
return {
'credentials': credentials,
'subscription_id': subscription_id
}
def _get_access_token_msi():
"""
get_access_token_msi()
get an Azure access token using the MSI library
"""
return MSIAuthentication().token.get('access_token')
def find_subscriptions_in_vm_with_msi(self, identity_id=None, allow_no_subscriptions=None):
# pylint: disable=too-many-statements
import jwt
from requests import HTTPError
from msrestazure.azure_active_directory import MSIAuthentication
from msrestazure.tools import is_valid_resource_id
resource = self.cli_ctx.cloud.endpoints.active_directory_resource_id
if identity_id:
if is_valid_resource_id(identity_id):
msi_creds = MSIAuthentication(resource=resource, msi_res_id=identity_id)
identity_type = MsiAccountTypes.user_assigned_resource_id
else:
authenticated = False
try:
msi_creds = MSIAuthentication(resource=resource, client_id=identity_id)
identity_type = MsiAccountTypes.user_assigned_client_id
authenticated = True
except HTTPError as ex:
if ex.response.reason == 'Bad Request' and ex.response.status == 400:
logger.info('Sniff: not an MSI client id')
else:
raise
if not authenticated:
try:
identity_type = MsiAccountTypes.user_assigned_object_id
def get_keyvault_client(self):
try:
self.log("Get KeyVaultClient from MSI")
credentials = MSIAuthentication(resource='https://vault.azure.net')
return KeyVaultClient(credentials)
except Exception:
self.log("Get KeyVaultClient from service principal")
# Create KeyVault Client using KeyVault auth class and auth_callback
def auth_callback(server, resource, scope):
if self.credentials['client_id'] is None or self.credentials['secret'] is None:
self.fail('Please specify client_id, secret and tenant to access azure Key Vault.')
tenant = self.credentials.get('tenant')
if not self.credentials['tenant']:
tenant = "common"
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
def _get_msi_credentials(self, subscription_id_param=None):
credentials = MSIAuthentication()
subscription_id_param = subscription_id_param or os.environ.get(AZURE_CREDENTIAL_ENV_MAPPING['subscription_id'], None)
try:
# try to get the subscription in MSI to test whether MSI is enabled
subscription_client = SubscriptionClient(credentials)
subscription = next(subscription_client.subscriptions.list())
subscription_id = str(subscription.subscription_id)
return {
'credentials': credentials,
'subscription_id': subscription_id_param or subscription_id
}
except Exception as exc:
return None