Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_auth_invalid(self):
"""
Test invalid authentication method
"""
shark = SocketShark(TEST_CONFIG)
client = MockClient(shark)
session = client.session
await session.on_client_event({'event': 'auth', 'method': 'x'})
assert client.log.pop() == {
'status': 'error',
'event': 'auth',
'error': c.ERR_AUTH_UNSUPPORTED,
}
no_auth_config = TEST_CONFIG.copy()
no_auth_config['AUTHENTICATION'] = {}
shark = SocketShark(no_auth_config)
session = Session(shark, client)
await session.on_client_event({'event': 'auth', 'method': 'ticket'})
assert client.log.pop() == {
'status': 'error',
'event': 'auth',
'error': c.ERR_AUTH_UNSUPPORTED,
}
assert not client.log
async def process(self):
if self.method not in self.auth_config:
raise EventError(c.ERR_AUTH_UNSUPPORTED)
# The only supported method.
assert self.method == 'ticket'
auth_method_config = self.auth_config[self.method]
ticket = self.data.get('ticket')
if not ticket:
raise EventError(c.ERR_NEEDS_TICKET)
auth_url = auth_method_config['validation_url']
auth_fields = auth_method_config['auth_fields']
result = await http_post(self.shark, auth_url, {'ticket': ticket})
if result.get('status') != 'ok':
raise EventError(result.get('error', c.ERR_AUTH_FAILED))
auth_info = {field: result[field] for field in auth_fields}