Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_negative_no_root(self):
user_realm_response = 'noroot'
httpretty.register_uri(httpretty.GET, uri=self.testUrl, body=user_realm_response, status=200)
user_realm = adal.user_realm.UserRealm(cp['callContext'], self.user, self.authority)
def _callback(err):
self.assertIsNotNone(err,'Did not receive expected error')
user_realm.discover(_callback)
util.match_standard_request_headers(httpretty.last_request())
def test_negative_empty_json(self):
user_realm_response = '{}'
httpretty.register_uri(httpretty.GET, uri=self.testUrl, body=user_realm_response, status=200)
user_realm = adal.user_realm.UserRealm(cp['callContext'], self.user, self.authority)
def _callback(err):
self.assertIsNotNone(err,'Did not receive expected error')
user_realm.discover(_callback)
util.match_standard_request_headers(httpretty.last_request())
def test_happy_path_federated(self):
user_realm_response = '{\"account_type\":\"Federated\",\"federation_protocol\":\"wstrust\",\"federation_metadata_url\":\"https://adfs.federatedtenant.com/adfs/services/trust/mex\",\"federation_active_auth_url\":\"https://adfs.federatedtenant.com/adfs/services/trust/2005/usernamemixed\",\"ver\":\"0.8\"}';
httpretty.register_uri(httpretty.GET, uri=self.testUrl, body=user_realm_response, status=200)
user_realm = adal.user_realm.UserRealm(cp['callContext'], self.user, self.authority)
def _callback(err):
self.assertIsNone(err, "Error raised during function: {0}".format(err))
self.assertEqual(user_realm.federation_metadata_url, 'https://adfs.federatedtenant.com/adfs/services/trust/mex',
'Returned Mex URL does not match expected value: {0}'.format(user_realm.federation_metadata_url))
self.assertAlmostEqual(user_realm.federation_active_auth_url, 'https://adfs.federatedtenant.com/adfs/services/trust/2005/usernamemixed',
'Returned active auth URL does not match expected value: {0}'.format(user_realm.federation_active_auth_url))
user_realm.discover(_callback)
util.match_standard_request_headers(httpretty.last_request())
def test_negative_wrong_field(self):
user_realm_response = '{\"account_type\":\"Manageddf\",\"federation_protocol\":\"SAML20fgfg\",\"federation_metadata\":\"https://adfs.federatedtenant.com/adfs/services/trust/mex\",\"federation_active_auth_url\":\"https://adfs.federatedtenant.com/adfs/services/trust/2005/usernamemixed\",\"version\":\"0.8\"}';
httpretty.register_uri(httpretty.GET, uri=self.testUrl, body=user_realm_response, status=200)
user_realm = adal.user_realm.UserRealm(cp['callContext'], self.user, self.authority)
def _callback(err):
self.assertIsNotNone(err,'Did not receive expected error')
user_realm.discover(_callback)
util.match_standard_request_headers(httpretty.last_request())
def create_user_realm_stub(self, protocol, accountType, mexUrl, wstrustUrl, err=None):
userRealm = UserRealm(cp['callContext'], '', '')
userRealm.discover = mock.MagicMock()
userRealm._federationProtocol = protocol
userRealm._accountType = accountType
userRealm._federationMetadataUrl = mexUrl
userRealm.federation_metadata_url = mexUrl
userRealm._federationActiveAuthUrl = wstrustUrl
userRealm.federation_active_auth_url = wstrustUrl
userRealm.account_type = accountType
return userRealm
try:
response = json.loads(body)
except ValueError:
self._log.info(
"Parsing realm discovery response JSON failed for body: %(body)s",
{"body": body})
raise
account_type = UserRealm._validate_account_type(response['account_type'])
if not account_type:
raise AdalError('Cannot parse account_type: {}'.format(account_type))
self.account_type = account_type
if self.account_type == ACCOUNT_TYPE['Federated']:
protocol = UserRealm._validate_federation_protocol(response['federation_protocol'])
if not protocol:
raise AdalError('Cannot parse federation protocol: {}'.format(protocol))
self.federation_protocol = protocol
self.federation_metadata_url = response['federation_metadata_url']
self.federation_active_auth_url = response['federation_active_auth_url']
self._log_parsed_response()
def _parse_discovery_response(self, body):
self._log.debug("Discovery response:\n %(discovery_response)s",
{"discovery_response": body})
try:
response = json.loads(body)
except ValueError:
self._log.info(
"Parsing realm discovery response JSON failed for body: %(body)s",
{"body": body})
raise
account_type = UserRealm._validate_account_type(response['account_type'])
if not account_type:
raise AdalError('Cannot parse account_type: {}'.format(account_type))
self.account_type = account_type
if self.account_type == ACCOUNT_TYPE['Federated']:
protocol = UserRealm._validate_federation_protocol(response['federation_protocol'])
if not protocol:
raise AdalError('Cannot parse federation protocol: {}'.format(protocol))
self.federation_protocol = protocol
self.federation_metadata_url = response['federation_metadata_url']
self.federation_active_auth_url = response['federation_active_auth_url']
self._log_parsed_response()
def _create_user_realm_request(self, username):
return user_realm.UserRealm(self._call_context,
username,
self._authentication_context.authority.url)
@staticmethod
def _validate_federation_protocol(protocol):
return UserRealm._validate_constant_value(FEDERATION_PROTOCOL_TYPE, protocol)