Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_settings_none(self):
current_options = adal_logging.get_logging_options()
adal_logging.set_logging_options()
options = adal_logging.get_logging_options()
adal_logging.set_logging_options(current_options)
noOptions = len(options) == 1 and options['level'] == 'ERROR'
self.assertTrue(noOptions, 'Did not expect to find any logging options set: ' + json.dumps(options))
def turn_on_logging(level='DEBUG', handler = None):
log.set_logging_options({
'level' : level,
'handler' : handler
})
def turn_on_logging(level=log.LOGGING_LEVEL.DEBUG):
log.set_logging_options({'level':level})
def create_empty_adal_object():
context = log.create_log_context()
component = 'TEST'
logger = log.Logger(component, context)
call_context = {'log_context' : context }
adal_object = { 'log' : logger, 'call_context' : call_context }
return adal_object
def __init__(self, call_context, authority):
self._token_endpoint = authority.token_endpoint
self._device_code_endpoint = authority.device_code_endpoint
self._log = log.Logger("OAuth2Client", call_context['log_context'])
self._call_context = call_context
self._cancel_polling_request = False
def __init__(self, call_context, authority, resource, client_id, cache,
refresh_function):
self._call_context = call_context
self._log = log.Logger("OAuth2Client", call_context['log_context'])
self._authority = authority
self._resource = resource
self._client_id = client_id
self._cache = cache
self._refresh_function = refresh_function
def __init__(self, call_context, authentication_context, client_id,
resource):
self._log = log.Logger("CodeRequest", call_context['log_context'])
self._call_context = call_context
self._authentication_context = authentication_context
self._client_id = client_id
self._resource = resource
def __init__(self, call_context, watrust_endpoint_url, applies_to, wstrust_endpoint_version):
self._log = log.Logger('WSTrustRequest', call_context['log_context'])
self._call_context = call_context
self._wstrust_endpoint_url = watrust_endpoint_url
self._applies_to = applies_to
self._wstrust_endpoint_version = wstrust_endpoint_version
from whoville import config, utils, security
import base64
__all__ = ['create_libcloud_session', 'create_boto3_session', 'get_cloudbreak', 'get_k8s_join_string',
'deploy_instances', 'add_sec_rule_to_ec2_group', 'get_k8svm', 'initialize_k8s_minion',
'create_node', 'list_images', 'list_sizes_aws', 'list_networks', 'initialize_k8s_master',
'list_subnets', 'list_security_groups', 'list_keypairs', 'list_nodes', 'nuke_namespace',
'aws_get_static_ip', 'resolve_firewall_rules', 'ops_get_security_group', 'ops_get_ssh_key',
'list_sizes_ops', 'ops_get_hosting_infra', 'ops_define_base_machine', 'define_userdata_script',
'aws_clean_stacks', 'delete_aws_network', 'aws_get_ssh_key', 'aws_get_hosting_infra']
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
# ADAL for Azure is verbose, reducing output
adal.log.set_logging_options({'level': 'WARNING'})
horton = utils.Horton()
def create_libcloud_session():
provider = config.profile.get('platform')['provider']
cls = get_driver(getattr(Provider, provider))
params = config.profile.get('platform')
if not params:
raise ValueError("Profile not configured with Platform Parameters")
if provider == 'EC2':
return cls(
**{x: y for x, y in params.items() if x in ['key', 'secret', 'region']}
)
elif provider == 'AZURE_ARM':
return cls(tenant_id=params['tenant'],