Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_client(self):
if os.getenv("USE_MSI", "false").lower() == "true":
_logger.info('Using MSI')
credentials = MSIAuthentication(resource=VAULT_RESOURCE_NAME)
else:
self._parse_sp_file()
authority = '/'.join([AZURE_AUTHORITY_SERVER.rstrip('/'), self.tenant_id])
_logger.info('Using authority: %s', authority)
context = AuthenticationContext(authority)
_logger.info('Using vault resource name: %s and client id: %s', VAULT_RESOURCE_NAME, self.client_id)
credentials = AdalAuthentication(context.acquire_token_with_client_credentials, VAULT_RESOURCE_NAME,
self.client_id, self.client_secret)
return KeyVaultClient(credentials)
def __init__(self, batchToken: str, armToken: str, armUrl: str, storage_endpoint: str, account: BatchAccount):
self.batchCreds = AdalAuthentication(
lambda: {'accessToken': batchToken, 'tokenType': 'Bearer'})
self.armCreds = AdalAuthentication(
lambda: {'accessToken': armToken, 'tokenType': 'Bearer'})
self.armUrl = armUrl
self.storage_endpoint = storage_endpoint
self.account = account
self.client = azext.batch.BatchExtensionsClient(
credentials=self.batchCreds,
batch_account=self.account.name,
batch_url='https://{0}'.format(account.account_endpoint),
subscription_id=account.subscription_id,
mgmt_credentials=self.armCreds,
mgmt_base_url=self.armUrl,
storage_endpoint=self.storage_endpoint)
def __init__(self, batchToken: str, armToken: str, armUrl: str, storage_endpoint: str, account: BatchAccount):
self.batchCreds = AdalAuthentication(
lambda: {'accessToken': batchToken, 'tokenType': 'Bearer'})
self.armCreds = AdalAuthentication(
lambda: {'accessToken': armToken, 'tokenType': 'Bearer'})
self.armUrl = armUrl
self.storage_endpoint = storage_endpoint
self.account = account
self.client = azext.batch.BatchExtensionsClient(
credentials=self.batchCreds,
batch_account=self.account.name,
batch_url='https://{0}'.format(account.account_endpoint),
subscription_id=account.subscription_id,
mgmt_credentials=self.armCreds,
mgmt_base_url=self.armUrl,
storage_endpoint=self.storage_endpoint)
raise ValueError("Need activeDirectoryResourceId or resourceManagerEndpointUrl key")
resource = config_dict.get('activeDirectoryResourceId', config_dict['resourceManagerEndpointUrl'])
authority_url = config_dict['activeDirectoryEndpointUrl']
is_adfs = bool(re.match('.+(/adfs|/adfs/)$', authority_url, re.I))
if is_adfs:
authority_url = authority_url.rstrip('/') # workaround: ADAL is known to reject auth urls with trailing /
else:
authority_url = authority_url + '/' + config_dict['tenantId']
context = adal.AuthenticationContext(
authority_url,
api_version=None,
validate_authority=not is_adfs
)
parameters['credentials'] = AdalAuthentication(
context.acquire_token_with_client_credentials,
resource,
config_dict['clientId'],
config_dict['clientSecret']
)
parameters.update(kwargs)
return _instantiate_client(client_class, **parameters)