Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
failed_sources = []
success_sources = []
with open('{0}/parameters.json'.format(os.getcwd()), 'r') as f:
parameters = f.read()
if re.search(r'"<.*>"', parameters):
print('Please replace placeholders in parameters.json')
exit()
parameters = json.loads(parameters)
key_vault = parameters['key vault']
tenant_id = str(json.loads(subprocess.check_output('az keyvault secret show --name tenant-id --vault-name {0}'.format(key_vault), shell=True))["value"])
app_id = str(json.loads(subprocess.check_output('az keyvault secret show --name app-id --vault-name {0}'.format(key_vault), shell=True))["value"])
app_secret = str(json.loads(subprocess.check_output('az keyvault secret show --name app-secret --vault-name {0}'.format(key_vault), shell=True))["value"])
authority_url = parameters['authority host url'] + '/' + tenant_id
context = adal.AuthenticationContext(authority_url)
token = context.acquire_token_with_client_credentials(
parameters['resource'],
app_id,
app_secret)
head = {'Authorization': 'Bearer ' + token['accessToken']}
subscription = str(json.loads(subprocess.check_output('az keyvault secret show --name subscription-id --vault-name {0}'.format(key_vault), shell=True))["value"])
resource_group = parameters['resource group']
workspace = parameters['workspace']
url = ENDPOINT.format(subscription, resource_group, workspace)
sources = ['Heartbeat', 'Syslog', 'Perf', 'ApacheAccess_CL', 'MySQL_CL', 'Custom_Log_CL']
distro = hostname.split('-')[0]
results = {}
results[distro] = {}
@unittest.skip('https://github.com/AzureAD/azure-activedirectory-library-for-python-priv/issues/21')
@httpretty.activate
def test_managed_happy_path(self):
util.setup_expected_user_realm_response_common(False)
response = util.create_response()
authorityUrl = response['authority'] + '/' + cp['tenant']
upRequest = self.setup_expected_username_password_request_response(200, response['wireResponse'], authorityUrl)
context = adal.AuthenticationContext(authorityUrl)
token_response = context.acquire_token_with_username_password(response['resource'], user_pass_params['username'], user_pass_params['password'], client_cred_params['clientId'])
self.assertTrue(util.isMatchTokenResponse(response['cachedResponse'], token_response), 'Response did not match expected: ' + JSON.stringify(token_response))
@httpretty.activate
def test_happy_path(self):
response = util.create_response()
self.setup_expected_auth_code_token_request_response(200, response['wireResponse'])
context = adal.AuthenticationContext(cp['authUrl'])
token_response = context.acquire_token_with_authorization_code(self.authorization_code, self.redirect_uri, response['resource'], cp['clientId'], cp['clientSecret'])
self.assertTrue(util.is_match_token_response(response['decodedResponse'], token_response), 'The response did not match what was expected')
req = httpretty.last_request()
util.match_standard_request_headers(req)
# Get the Azure Automation RunAs service principal certificate
cert = automationassets.get_automation_certificate("AzureRunAsCertificate")
sp_cert = crypto.load_pkcs12(cert)
pem_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, sp_cert.get_privatekey())
# Get run as connection information for the Azure Automation service principal
runas_connection = automationassets.get_automation_connection("AzureRunAsConnection")
application_id = runas_connection["ApplicationId"]
thumbprint = runas_connection["CertificateThumbprint"]
tenant_id = runas_connection["TenantId"]
# Authenticate with service principal certificate
resource = "https://management.core.windows.net/"
authority_url = ("https://login.microsoftonline.com/" + tenant_id)
context = adal.AuthenticationContext(authority_url)
azure_credential = context.acquire_token_with_client_certificate(
resource,
application_id,
pem_pkey,
thumbprint)
# Return the token
return azure_credential.get('accessToken')
expiration_datetime = dateutil.parser.parse(expires_on)
else:
expiration_datetime = datetime.now() + timedelta(minutes=30)
current_datetime = datetime.now() + timedelta(minutes=1)
if expiration_datetime > current_datetime:
valid_token = token
logger().debug(f"_MyAadHelper::_validate_and_refresh_token - succeeded, no need to refresh yet, expires on {expires_on} - resource: '{resource}'")
else:
logger().debug(f"_MyAadHelper::_validate_and_refresh_token - token expires on {expires_on} need to refresh - resource: '{resource}'")
refresh_token = self._get_token_refresh_token(token)
if refresh_token is not None:
try:
if self._current_adal_context is None:
authority_uri = self._get_token_authority(token) or self._get_authority_from_token(token) or self._authority_uri
self._current_adal_context = AuthenticationContext(authority_uri, cache=None)
client_id = self._get_token_client_id(token) or self._get_client_id_from_token(token) or self._client_id
valid_token = self._current_adal_context.acquire_token_with_refresh_token(refresh_token, client_id, resource)
except Exception as e:
self._warn_on_token_validation_failure(f"access token expired on {expires_on}, failed to refresh access token, Exception: {e}")
logger().debug(f"_MyAadHelper::_validate_and_refresh_token - {'failed' if token is None else 'succeeded'} to refresh token - resource: '{resource}'")
else:
logger().debug(f"_MyAadHelper::_validate_and_refresh_token - failed to refresh expired token, token doesn't contain refresh token - resource: '{resource}'")
self._warn_on_token_validation_failure(f"access token expired on {expires_on}, and token entry has no refresh_token")
return valid_token
if any([kcsb.user_token, kcsb.application_token]):
self._token = kcsb.user_token or kcsb.application_token
self._authentication_method = AuthenticationMethod.aad_token
return
authority = kcsb.authority_id or "common"
aad_authority_uri = (
os.environ["AadAuthorityUri"] if "AadAuthorityUri" in os.environ else "https://login.microsoftonline.com/"
)
full_authority_uri = (
aad_authority_uri + authority if aad_authority_uri.endswith("/") else aad_authority_uri + "/" + authority
)
self._kusto_cluster = "{0.scheme}://{0.hostname}".format(urlparse(kcsb.data_source))
self._adal_context = AuthenticationContext(full_authority_uri)
self._username = None
if all([kcsb.aad_user_id, kcsb.password]):
self._authentication_method = AuthenticationMethod.aad_username_password
self._client_id = "db662dc1-0cfe-4e1c-a843-19a68e65be58"
self._username = kcsb.aad_user_id
self._password = kcsb.password
elif all([kcsb.application_client_id, kcsb.application_key]):
self._authentication_method = AuthenticationMethod.aad_application_key
self._client_id = kcsb.application_client_id
self._client_secret = kcsb.application_key
elif all([kcsb.application_client_id, kcsb.application_certificate, kcsb.application_certificate_thumbprint]):
self._authentication_method = AuthenticationMethod.aad_application_certificate
self._client_id = kcsb.application_client_id
self._certificate = kcsb.application_certificate
self._thumbprint = kcsb.application_certificate_thumbprint
else:
def _create_adal_context(self):
authority_url = self.cloud_environment.endpoints.active_directory
is_adfs = bool(re.match('.+(/adfs|/adfs/)$', authority_url, re.I))
if is_adfs:
authority_url = authority_url.rstrip('/') # workaround: ADAL is known to reject auth urls with trailing /
else:
authority_url = authority_url + '/' + self._tenant
self._context = adal.AuthenticationContext(
authority_url,
timeout=self._timeout,
verify_ssl=self._verify,
proxies=self._proxies,
validate_authority=not is_adfs,
cache=self._cache,
api_version=None
)
parameters = f.read()
sample_parameters = json.loads(parameters)
else:
raise ValueError('Please provide parameter file with account information.')
authority_url = (sample_parameters['authorityHostUrl'] + '/' +
sample_parameters['tenant'])
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
RESOURCE = sample_parameters.get('resource', GRAPH_RESOURCE)
#uncomment for verbose logging
turn_on_logging()
### Main logic begins
context = adal.AuthenticationContext(authority_url)
key = get_private_key(sample_parameters['privateKeyFile'])
token = context.acquire_token_with_client_certificate(
RESOURCE,
sample_parameters['clientId'],
key,
sample_parameters['thumbprint'])
### Main logic ends
print('Here is the token:')
print(json.dumps(token, indent=2))
def refresh_auth_tokens(self, batch_token, mgmt_token):
context = adal.AuthenticationContext(self.aad_environment_provider.getAadAuthorityHostUrl(self.aad_environment_id) + '/' + self.aad_tenant_name, api_version=None)
try:
self.mgmt_auth_token = context.acquire_token_with_refresh_token(
mgmt_token['refreshToken'],
self.aadClientId,
self.aad_environment_provider.getAadManagementUrl(self.aad_environment_id))
self.batch_auth_token = context.acquire_token_with_refresh_token(
batch_token['refreshToken'],
self.aadClientId,
self.aad_environment_provider.getBatchResourceUrl(self.aad_environment_id))
return True
except AdalError as exp:
errors = exp.error_response['error_codes']