Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_key_vault_client():
return KeyVaultClient(ServicePrincipalCredentials(
client_id=os.environ.get("CLIENT_ID"),
secret=os.environ.get("CLIENT_SECRET"),
tenant=os.environ.get("TENANT_ID"),
resource='https://vault.azure.net'
))
rslist= lower_rg.split(',')
if len(rslist) > 0:
filter_resource_groups = rslist
additional_params = []
if args.params:
pairlist = args.params.split('&')
for s in pairlist:
p = s.split('=')
if (len(p)==2):
additional_params.append(p)
### Load Config
cconf = ClientProfileConfig(client_profile_file)
### Get Target virtual machines info List
credentials = ServicePrincipalCredentials(
cconf.client_id,cconf.client_scret,
tenant=cconf.tenant_id)
compute_client = ComputeManagementClient(
credentials, cconf.subscription_id)
network_client = NetworkManagementClient(
credentials, cconf.subscription_id)
target_vm_list = []
for vm in compute_client.virtual_machines.list_all():
target_vm = {}
target_vm['name'] = vm.name
vm_rgroup = get_resorucegroup_from_vmid(vm.id)
# Filtering by resource group if needed
if len(filter_resource_groups) > 0:
r = vm_rgroup.lower()
if not (r in filter_resource_groups):
continue # skip
def get_key_vault_credentials():
"""This tries to get a token using MSI, or fallback to SP env variables.
"""
if "APPSETTING_WEBSITE_SITE_NAME" in os.environ:
return MSIAuthentication(
resource='https://vault.azure.net'
)
else:
return ServicePrincipalCredentials(
client_id=os.environ['AZURE_CLIENT_ID'],
secret=os.environ['AZURE_CLIENT_SECRET'],
tenant=os.environ['AZURE_TENANT_ID'],
resource='https://vault.azure.net'
)
def return_azure_creds(app_id,key, tenant_id):
return ServicePrincipalCredentials(
client_id=app_id,
secret=key,
tenant=tenant_id
)
def setup_sample(self):
"""
Provides common setup for Key Vault samples, such as creating rest clients, creating a sample resource group
if needed, and ensuring proper access for the service principal.
:return: None
"""
if not self._setup_complete:
self.mgmt_creds = ServicePrincipalCredentials(client_id=self.config.client_id, secret=self.config.client_secret,
tenant=self.config.tenant_id)
self.data_creds = ServicePrincipalCredentials(client_id=self.config.client_id, secret=self.config.client_secret,
tenant=self.config.tenant_id)
self.resource_mgmt_client = ResourceManagementClient(self.mgmt_creds, self.config.subscription_id)
# ensure the service principle has key vault as a valid provider
self.resource_mgmt_client.providers.register('Microsoft.KeyVault')
# ensure the intended resource group exists
self.resource_mgmt_client.resource_groups.create_or_update(self.config.group_name, {'location': self.config.location})
self.keyvault_mgmt_client = KeyVaultManagementClient(self.mgmt_creds, self.config.subscription_id)
self.keyvault_data_client = KeyVaultClient(self.data_creds)
self._setup_complete = True
tenant_id = parameters[self.PARAM_TENANT_ID]
# Get an Authentication token using ADAL.
context = adal.AuthenticationContext(self.AZURE_AUTH_ENDPOINT + tenant_id)
try:
token_response = context.acquire_token_with_client_credentials(
self.AZURE_RESOURCE_URL, app_id, app_secret_key)
except adal.adal_error.AdalError as e:
raise AgentConfigurationException(
"Unable to communicate with Azure! Please check your cloud "
"configuration. Reason: {}".format(e.message))
token_response.get('accessToken')
# To access Azure resources for an application, we need a Service Principal
# with the accurate role assignment. It can be created using the Azure CLI.
credentials = ServicePrincipalCredentials(client_id=app_id,
secret=app_secret_key,
tenant=tenant_id)
return credentials