How to use the adal.AuthenticationContext function in adal

To help you get started, we’ve selected a few adal examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Azure / azure-linux-extensions / OmsAgent / extension-test / verify_e2e.py View on Github external
failed_sources = []
    success_sources = []

    with open('{0}/parameters.json'.format(os.getcwd()), 'r') as f:
        parameters = f.read()
        if re.search(r'"<.*>"', parameters):
            print('Please replace placeholders in parameters.json')
            exit()
        parameters = json.loads(parameters)

    key_vault = parameters['key vault']
    tenant_id = str(json.loads(subprocess.check_output('az keyvault secret show --name tenant-id --vault-name {0}'.format(key_vault), shell=True))["value"])
    app_id = str(json.loads(subprocess.check_output('az keyvault secret show --name app-id --vault-name {0}'.format(key_vault), shell=True))["value"])
    app_secret = str(json.loads(subprocess.check_output('az keyvault secret show --name app-secret --vault-name {0}'.format(key_vault), shell=True))["value"])
    authority_url = parameters['authority host url'] + '/' + tenant_id
    context = adal.AuthenticationContext(authority_url)
    token = context.acquire_token_with_client_credentials(
        parameters['resource'],
        app_id,
        app_secret)

    head = {'Authorization': 'Bearer ' + token['accessToken']}
    subscription = str(json.loads(subprocess.check_output('az keyvault secret show --name subscription-id --vault-name {0}'.format(key_vault), shell=True))["value"])
    resource_group = parameters['resource group']
    workspace = parameters['workspace']
    url = ENDPOINT.format(subscription, resource_group, workspace)

    sources = ['Heartbeat', 'Syslog', 'Perf', 'ApacheAccess_CL', 'MySQL_CL', 'Custom_Log_CL']
    distro = hostname.split('-')[0]
    results = {}
    results[distro] = {}
github AzureAD / azure-activedirectory-library-for-python / tests / test_username_password.py View on Github external
        @unittest.skip('https://github.com/AzureAD/azure-activedirectory-library-for-python-priv/issues/21')
        @httpretty.activate
        def test_managed_happy_path(self):
            util.setup_expected_user_realm_response_common(False)
            response = util.create_response()

            authorityUrl = response['authority'] + '/' + cp['tenant']
            upRequest = self.setup_expected_username_password_request_response(200, response['wireResponse'], authorityUrl)

            context = adal.AuthenticationContext(authorityUrl)
            token_response = context.acquire_token_with_username_password(response['resource'], user_pass_params['username'], user_pass_params['password'], client_cred_params['clientId'])
            self.assertTrue(util.isMatchTokenResponse(response['cachedResponse'], token_response), 'Response did not match expected: ' + JSON.stringify(token_response))
github AzureAD / azure-activedirectory-library-for-python / tests / test_authorization_code.py View on Github external
    @httpretty.activate
    def test_happy_path(self):
        response = util.create_response()

        self.setup_expected_auth_code_token_request_response(200, response['wireResponse'])

        context = adal.AuthenticationContext(cp['authUrl'])
        token_response = context.acquire_token_with_authorization_code(self.authorization_code, self.redirect_uri, response['resource'], cp['clientId'], cp['clientSecret'])

        self.assertTrue(util.is_match_token_response(response['decodedResponse'], token_response), 'The response did not match what was expected')

        req = httpretty.last_request()
        util.match_standard_request_headers(req)
github andyt530 / py-az2tf / runbook / inline / runbook_auth.py View on Github external
# Get the Azure Automation RunAs service principal certificate
    cert = automationassets.get_automation_certificate("AzureRunAsCertificate")
    sp_cert = crypto.load_pkcs12(cert)
    pem_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, sp_cert.get_privatekey())

    # Get run as connection information for the Azure Automation service principal
    runas_connection = automationassets.get_automation_connection("AzureRunAsConnection")
    application_id = runas_connection["ApplicationId"]
    thumbprint = runas_connection["CertificateThumbprint"]
    tenant_id = runas_connection["TenantId"]

    # Authenticate with service principal certificate
    resource = "https://management.core.windows.net/"
    authority_url = ("https://login.microsoftonline.com/" + tenant_id)
    context = adal.AuthenticationContext(authority_url)
    azure_credential = context.acquire_token_with_client_certificate(
        resource,
        application_id,
        pem_pkey,
        thumbprint)
    
    # Return the token
    return azure_credential.get('accessToken')
github microsoft / jupyter-Kqlmagic / azure / Kqlmagic / my_aad_helper.py View on Github external
expiration_datetime = dateutil.parser.parse(expires_on)
            else:
                expiration_datetime = datetime.now() + timedelta(minutes=30)

            current_datetime = datetime.now() + timedelta(minutes=1)
            if expiration_datetime > current_datetime:
                valid_token = token
                logger().debug(f"_MyAadHelper::_validate_and_refresh_token - succeeded, no need to refresh yet, expires on {expires_on} - resource: '{resource}'")
            else:
                logger().debug(f"_MyAadHelper::_validate_and_refresh_token - token expires on {expires_on} need to refresh - resource: '{resource}'")
                refresh_token = self._get_token_refresh_token(token)
                if refresh_token is not None:
                    try:
                        if self._current_adal_context is None:
                            authority_uri = self._get_token_authority(token) or self._get_authority_from_token(token) or self._authority_uri
                            self._current_adal_context = AuthenticationContext(authority_uri, cache=None)
                        client_id = self._get_token_client_id(token) or self._get_client_id_from_token(token) or self._client_id
                        valid_token = self._current_adal_context.acquire_token_with_refresh_token(refresh_token, client_id, resource)
                    except Exception as e:
                        self._warn_on_token_validation_failure(f"access token expired on {expires_on}, failed to refresh access token, Exception: {e}")

                    logger().debug(f"_MyAadHelper::_validate_and_refresh_token - {'failed' if token is None else 'succeeded'} to refresh token - resource: '{resource}'")
                else:
                    logger().debug(f"_MyAadHelper::_validate_and_refresh_token - failed to refresh expired token, token doesn't contain refresh token - resource: '{resource}'")
                    self._warn_on_token_validation_failure(f"access token expired on {expires_on}, and token entry has no refresh_token")

        return valid_token
github Azure / azure-kusto-python / azure-kusto-data / azure / kusto / data / security.py View on Github external
if any([kcsb.user_token, kcsb.application_token]):
            self._token = kcsb.user_token or kcsb.application_token
            self._authentication_method = AuthenticationMethod.aad_token
            return

        authority = kcsb.authority_id or "common"

        aad_authority_uri = (
            os.environ["AadAuthorityUri"] if "AadAuthorityUri" in os.environ else "https://login.microsoftonline.com/"
        )
        full_authority_uri = (
            aad_authority_uri + authority if aad_authority_uri.endswith("/") else aad_authority_uri + "/" + authority
        )

        self._kusto_cluster = "{0.scheme}://{0.hostname}".format(urlparse(kcsb.data_source))
        self._adal_context = AuthenticationContext(full_authority_uri)
        self._username = None
        if all([kcsb.aad_user_id, kcsb.password]):
            self._authentication_method = AuthenticationMethod.aad_username_password
            self._client_id = "db662dc1-0cfe-4e1c-a843-19a68e65be58"
            self._username = kcsb.aad_user_id
            self._password = kcsb.password
        elif all([kcsb.application_client_id, kcsb.application_key]):
            self._authentication_method = AuthenticationMethod.aad_application_key
            self._client_id = kcsb.application_client_id
            self._client_secret = kcsb.application_key
        elif all([kcsb.application_client_id, kcsb.application_certificate, kcsb.application_certificate_thumbprint]):
            self._authentication_method = AuthenticationMethod.aad_application_certificate
            self._client_id = kcsb.application_client_id
            self._certificate = kcsb.application_certificate
            self._thumbprint = kcsb.application_certificate_thumbprint
        else:
github Azure / azure-sdk-for-python / sdk / core / azure-mgmt-core / azure / mgmt / core / azure_active_directory.py View on Github external
def _create_adal_context(self):
        authority_url = self.cloud_environment.endpoints.active_directory
        is_adfs = bool(re.match('.+(/adfs|/adfs/)$', authority_url, re.I))
        if is_adfs:
            authority_url = authority_url.rstrip('/')  # workaround: ADAL is known to reject auth urls with trailing /
        else:
            authority_url = authority_url + '/' + self._tenant

        self._context = adal.AuthenticationContext(
            authority_url,
            timeout=self._timeout,
            verify_ssl=self._verify,
            proxies=self._proxies,
            validate_authority=not is_adfs,
            cache=self._cache,
            api_version=None
        )
github AzureAD / azure-activedirectory-library-for-python / sample / certificate_credentials_sample.py View on Github external
parameters = f.read()
    sample_parameters = json.loads(parameters)
else:
    raise ValueError('Please provide parameter file with account information.')


authority_url = (sample_parameters['authorityHostUrl'] + '/' +
                 sample_parameters['tenant'])
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
RESOURCE = sample_parameters.get('resource', GRAPH_RESOURCE)

#uncomment for verbose logging
turn_on_logging()

### Main logic begins
context = adal.AuthenticationContext(authority_url)
key = get_private_key(sample_parameters['privateKeyFile'])

token = context.acquire_token_with_client_certificate(
    RESOURCE,
    sample_parameters['clientId'],
    key,
    sample_parameters['thumbprint'])
### Main logic ends

print('Here is the token:')
print(json.dumps(token, indent=2))
github Azure / azure-batch-maya / azure_batch_maya / scripts / config.py View on Github external
def refresh_auth_tokens(self, batch_token, mgmt_token):

        context = adal.AuthenticationContext(self.aad_environment_provider.getAadAuthorityHostUrl(self.aad_environment_id) + '/' + self.aad_tenant_name, api_version=None)

        try:
            self.mgmt_auth_token = context.acquire_token_with_refresh_token(
                mgmt_token['refreshToken'],
                self.aadClientId,
                self.aad_environment_provider.getAadManagementUrl(self.aad_environment_id))

            self.batch_auth_token =  context.acquire_token_with_refresh_token(
                batch_token['refreshToken'],
                self.aadClientId,
                self.aad_environment_provider.getBatchResourceUrl(self.aad_environment_id))

            return True

        except AdalError as exp:
            errors = exp.error_response['error_codes']

adal

Note: This library is already replaced by MSAL Python, available here: https://pypi.org/project/msal/ .ADAL Python remains available here as a legacy. The ADAL for Python library makes it easy for python application to authenticate to Azure Active Directory (AAD) in order to access AAD protected web resources.

MIT
Latest version published 4 years ago

Package Health Score

58 / 100
Full package analysis

Similar packages