Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
cfg = Configuration("https://my_service.com")
cfg.retry_policy.backoff_factor=0
cfg.redirect_policy.max_redirects=2
cfg.credentials = Authentication()
self.client = ServiceClient(None, cfg)
return super(TestRedirect, self).setUp()
# Allow overriding resource manager URI using environment variable
# for testing purposes. Subscription id is also determined by environment
# variable.
rm_uri_override = getenv(RM_URI_OVERRIDE)
if rm_uri_override:
client_id = getenv(CLIENT_ID)
if client_id:
from azure.common.credentials import ServicePrincipalCredentials
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=getenv(CLIENT_SECRET),
tenant=getenv(TENANT_ID))
else:
from msrest.authentication import Authentication # pylint: disable=import-error
credentials = Authentication()
return MariaDBManagementClient(
subscription_id=getenv(SUB_ID_OVERRIDE),
base_url=rm_uri_override,
credentials=credentials)
# Normal production scenario.
return get_mgmt_service_client(cli_ctx, MariaDBManagementClient)
# Allow overriding resource manager URI using environment variable
# for testing purposes. Subscription id is also determined by environment
# variable.
rm_uri_override = getenv(RM_URI_OVERRIDE)
if rm_uri_override:
client_id = getenv(CLIENT_ID)
if client_id:
from azure.common.credentials import ServicePrincipalCredentials
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=getenv(CLIENT_SECRET),
tenant=getenv(TENANT_ID))
else:
from msrest.authentication import Authentication # pylint: disable=import-error
credentials = Authentication()
return MySQLManagementClient(
subscription_id=getenv(SUB_ID_OVERRIDE),
base_url=rm_uri_override,
credentials=credentials)
# Normal production scenario.
return get_mgmt_service_client(cli_ctx, MySQLManagementClient)
# Allow overriding resource manager URI using environment variable
# for testing purposes. Subscription id is also determined by environment
# variable.
rm_uri_override = getenv(RM_URI_OVERRIDE)
if rm_uri_override:
client_id = getenv(CLIENT_ID)
if client_id:
from azure.common.credentials import ServicePrincipalCredentials
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=getenv(CLIENT_SECRET),
tenant=getenv(TENANT_ID))
else:
from msrest.authentication import Authentication # pylint: disable=import-error
credentials = Authentication()
return MySQLManagementClient(
subscription_id=getenv(SUB_ID_OVERRIDE),
base_url=rm_uri_override,
credentials=credentials)
# Normal production scenario.
return get_mgmt_service_client(cli_ctx, MySQLManagementClient)
# Allow overriding resource manager URI using environment variable
# for testing purposes. Subscription id is also determined by environment
# variable.
rm_uri_override = getenv(RM_URI_OVERRIDE)
if rm_uri_override:
client_id = getenv(CLIENT_ID)
if client_id:
from azure.common.credentials import ServicePrincipalCredentials
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=getenv(CLIENT_SECRET),
tenant=getenv(TENANT_ID))
else:
from msrest.authentication import Authentication # pylint: disable=import-error
credentials = Authentication()
return PostgreSQLManagementClient(
subscription_id=getenv(SUB_ID_OVERRIDE),
base_url=rm_uri_override,
credentials=credentials)
# Normal production scenario.
return get_mgmt_service_client(cli_ctx, PostgreSQLManagementClient)
sign_key = '%s\n%d' % (encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(self.key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': self.uri,
'sig': signature,
'se': str(ttl)
}
if self.policy:
result['skn'] = self.policy
return 'SharedAccessSignature ' + urlencode(result)
class BasicSasTokenAuthentication(Authentication):
"""
Basic Shared Access Signature authorization for Azure IoT Hub.
Args:
sas_token (str): sas token to use in authentication.
"""
def __init__(self, sas_token):
self.sas_token = sas_token
def signed_session(self):
"""
Create requests session with SAS auth headers.
Returns:
session (): requests.Session.
"""
def signed_session(self, session=None):
# type: (Optional[requests.Session]) -> requests.Session
"""Create requests session with any required auth headers applied.
If a session object is provided, configure it directly. Otherwise,
create a new session and return it.
:param session: The session to configure for authentication
:type session: requests.Session
:rtype: requests.Session
"""
return session or requests.Session()
class BasicAuthentication(Authentication):
"""Implementation of Basic Authentication.
:param str username: Authentication username.
:param str password: Authentication password.
"""
def __init__(self, username, password):
# type: (str, str) -> None
self.scheme = 'Basic'
self.username = username
self.password = password
def signed_session(self, session=None):
# type: (Optional[requests.Session]) -> requests.Session
"""Create requests session with any required auth headers
applied.