Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_msi_credentials(
cloud: msrestazure.azure_cloud.Cloud,
resource_id: str = None
) -> msrestazure.azure_active_directory.MSIAuthentication:
"""Create MSI credentials
:param cloud: cloud kind
:param resource_id: resource id to auth against
:return: MSI auth object
"""
if is_not_empty(resource_id):
creds = msrestazure.azure_active_directory.MSIAuthentication(
cloud_environment=cloud,
resource=resource_id,
)
else:
creds = msrestazure.azure_active_directory.MSIAuthentication(
cloud_environment=cloud,
)
return creds
# Get the Azure Automation RunAs service principal certificate
cert = automationassets.get_automation_certificate("AzureRunAsCertificate")
sp_cert = crypto.load_pkcs12(cert)
pem_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, sp_cert.get_privatekey())
# Get run as connection information for the Azure Automation service principal
application_id = runas_connection["ApplicationId"]
thumbprint = runas_connection["CertificateThumbprint"]
tenant_id = runas_connection["TenantId"]
# Authenticate with service principal certificate
resource = "https://management.core.windows.net/"
authority_url = ("https://login.microsoftonline.com/" + tenant_id)
context = adal.AuthenticationContext(authority_url)
return azure_active_directory.AdalAuthentication(
lambda: context.acquire_token_with_client_certificate(
resource,
application_id,
pem_pkey,
thumbprint)
)
# create credential object
if (util.is_not_empty(aad_application_id) and
util.is_not_empty(aad_cert_private_key)):
if util.is_not_empty(aad_auth_key):
raise ValueError('cannot specify both cert auth and auth key')
if util.is_not_empty(aad_password):
raise ValueError('cannot specify both cert auth and password')
if settings.verbose(ctx.config):
logger.debug(
('using aad auth with certificate, auth={} endpoint={} '
'directoryid={} appid={} cert_thumbprint={}').format(
aad_authority_url, endpoint, aad_directory_id,
aad_application_id, aad_cert_thumbprint))
context = adal.AuthenticationContext(
'{}/{}'.format(aad_authority_url, aad_directory_id))
return msrestazure.azure_active_directory.AdalAuthentication(
lambda: context.acquire_token_with_client_certificate(
endpoint,
aad_application_id,
util.decode_string(open(aad_cert_private_key, 'rb').read()),
aad_cert_thumbprint
)
)
elif util.is_not_empty(aad_auth_key):
if util.is_not_empty(aad_password):
raise ValueError(
'Cannot specify both an AAD Service Principal and User')
if settings.verbose(ctx.config):
logger.debug(
('using aad auth with key, auth={} endpoint={} '
'directoryid={} appid={}').format(
aad_authority_url, endpoint, aad_directory_id,
aad_cert_thumbprint
)
)
elif util.is_not_empty(aad_auth_key):
if util.is_not_empty(aad_password):
raise ValueError(
'Cannot specify both an AAD Service Principal and User')
if settings.verbose(ctx.config):
logger.debug(
('using aad auth with key, auth={} endpoint={} '
'directoryid={} appid={}').format(
aad_authority_url, endpoint, aad_directory_id,
aad_application_id))
context = adal.AuthenticationContext(
'{}/{}'.format(aad_authority_url, aad_directory_id))
return msrestazure.azure_active_directory.AdalAuthentication(
context.acquire_token_with_client_credentials,
endpoint,
aad_application_id,
aad_auth_key,
)
elif util.is_not_empty(aad_password):
if settings.verbose(ctx.config):
logger.debug(
('using aad auth with username and password, auth={} '
'endpoint={} directoryid={} username={}').format(
aad_authority_url, endpoint, aad_directory_id, aad_user))
try:
return azure.common.credentials.UserPassCredentials(
username=aad_user,
password=aad_password,
tenant=aad_directory_id,
# Get the Azure Automation RunAs service principal certificate
cert = automationassets.get_automation_certificate("AzureRunAsCertificate")
sp_cert = crypto.load_pkcs12(cert)
pem_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, sp_cert.get_privatekey())
# Get run as connection information for the Azure Automation service principal
application_id = runas_connection["ApplicationId"]
thumbprint = runas_connection["CertificateThumbprint"]
tenant_id = runas_connection["TenantId"]
# Authenticate with service principal certificate
resource = "https://management.core.windows.net/"
authority_url = ("https://login.microsoftonline.com/" + tenant_id)
context = adal.AuthenticationContext(authority_url)
return azure_active_directory.AdalAuthentication(
lambda: context.acquire_token_with_client_certificate(
resource,
application_id,
pem_pkey,
thumbprint)
)