Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_krbLogin(self, base64, socket, context):
# TODO
s, cntext, cursor = self.get_session()
context.cnx = cntext.cnx
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser')
self.assertEqual(cm.exception.args[0], 'Already logged in')
s.logged_in = False
if six.PY3:
with self.assertRaises(koji.AuthError) as cm:
s.krbLogin('krb_req', 'proxyuser')
self.assertEqual(cm.exception.args[0], 'krbV module not installed')
else:
with mock.patch('koji.auth.krbV', create=True) as krbV:
princ = mock.MagicMock()
princ.name = 'princ_name'
krbV.default_context.return_value \
.rd_req.return_value = (mock.MagicMock(), 2, 3,
[1, 2, princ])
with self.assertRaises(koji.AuthError) as cm:
def login(self, user, password, opts=None):
"""create a login session"""
if opts is None:
opts = {}
if not isinstance(password, str) or len(password) == 0:
raise koji.AuthError('invalid username or password')
if self.logged_in:
raise koji.GenericError("Already logged in")
hostip = self.get_remote_ip(override=opts.get('hostip'))
# check passwd
c = context.cnx.cursor()
q = """SELECT id FROM users
WHERE name = %(user)s AND password = %(password)s"""
c.execute(q, locals())
r = c.fetchone()
if not r:
raise koji.AuthError('invalid username or password')
user_id = r[0]
self.checkLoginAllowed(user_id)
'EXTRACT(EPOCH FROM update_time)': 'update_ts',
'user_id': 'user_id',
}
# sort for stability (unittests)
fields, aliases = zip(*sorted(fields.items(), key=lambda x: x[1]))
q = """
SELECT %s FROM sessions
WHERE id = %%(id)i
AND key = %%(key)s
AND hostip = %%(hostip)s
FOR UPDATE
""" % ",".join(fields)
c.execute(q, locals())
row = c.fetchone()
if not row:
raise koji.AuthError('Invalid session or bad credentials')
session_data = dict(zip(aliases, row))
#check for expiration
if session_data['expired']:
raise koji.AuthExpired('session "%i" has expired' % id)
#check for callnum sanity
if callnum is not None:
try:
callnum = int(callnum)
except (ValueError, TypeError):
raise koji.AuthError("Invalid callnum: %r" % callnum)
lastcall = session_data['callnum']
if lastcall is not None:
if lastcall > callnum:
raise koji.SequenceError("%d > %d (session %d)" \
% (lastcall, callnum, id))
elif lastcall == callnum:
def checkLoginAllowed(self, user_id):
"""Verify that the user is allowed to login"""
cursor = context.cnx.cursor()
query = """SELECT name, usertype, status FROM users WHERE id = %(user_id)i"""
cursor.execute(query, locals())
result = cursor.fetchone()
if not result:
raise koji.AuthError('invalid user_id: %s' % user_id)
name, usertype, status = result
if status != koji.USER_STATUS['NORMAL']:
raise koji.AuthError('logins by %s are not allowed' % name)
def krbLogin(self, krb_req, proxyuser=None):
"""Authenticate the user using the base64-encoded
AP_REQ message in krb_req. If proxyuser is not None,
log in that user instead of the user associated with the
Kerberos principal. The principal must be an authorized
"proxy_principal" in the server config."""
if self.logged_in:
raise koji.AuthError("Already logged in")
if krbV is None:
# python3 is not supported
raise koji.AuthError("krbV module not installed")
if not (context.opts.get('AuthPrincipal') and context.opts.get('AuthKeytab')):
raise koji.AuthError('not configured for Kerberos authentication')
ctx = krbV.default_context()
srvprinc = krbV.Principal(name=context.opts.get('AuthPrincipal'), context=ctx)
srvkt = krbV.Keytab(name=context.opts.get('AuthKeytab'), context=ctx)
ac = krbV.AuthContext(context=ctx)
ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
conninfo = self.getConnInfo()
ac.addrs = conninfo
raise koji.RetryError(
"unable to retry call %d (method %s) for session %d" \
% (callnum, method, id))
# read user data
#historical note:
# we used to get a row lock here as an attempt to maintain sanity of exclusive
# sessions, but it was an imperfect approach and the lock could cause some
# performance issues.
fields = ('name', 'status', 'usertype')
q = """SELECT %s FROM users WHERE id=%%(user_id)s""" % ','.join(fields)
c.execute(q, session_data)
user_data = dict(zip(fields, c.fetchone()))
if user_data['status'] != koji.USER_STATUS['NORMAL']:
raise koji.AuthError('logins by %s are not allowed' % user_data['name'])
#check for exclusive sessions
if session_data['exclusive']:
#we are the exclusive session for this user
self.exclusive = True
else:
#see if an exclusive session exists
q = """SELECT id FROM sessions WHERE user_id=%(user_id)s
AND "exclusive" = TRUE AND expired = FALSE"""
#should not return multiple rows (unique constraint)
c.execute(q, session_data)
row = c.fetchone()
if row:
(excl_id,) = row
if excl_id == session_data['master']:
#(note excl_id cannot be None)
#our master session has the lock
if atidx == -1:
raise koji.AuthError('invalid Kerberos principal: %s' % krb_principal)
user_name = krb_principal[:atidx]
# check if user already exists
c = context.cnx.cursor()
q = """SELECT krb_principal FROM users
WHERE name = %(user_name)s"""
c.execute(q, locals())
r = c.fetchone()
if not r:
return self.createUser(user_name, krb_principal=krb_principal)
else:
existing_user_krb = r[0]
if existing_user_krb is not None:
raise koji.AuthError('user %s already associated with other Kerberos principal: %s' % (user_name, existing_user_krb))
return self.setKrbPrincipal(user_name, krb_principal)
Raises:
RebaseHelperError: If login failed.
"""
config = koji.read_config(profile)
session = koji.ClientSession(config['server'], opts=config)
if not login:
return session
try:
session.gssapi_login()
except Exception: # pylint: disable=broad-except
pass
else:
return session
# fall back to kerberos login (doesn't work with python3)
exc = (koji.AuthError, koji.krbV.Krb5Error) if koji.krbV else koji.AuthError
try:
session.krb_login()
except exc as e:
raise RebaseHelperError('Login failed: {}'.format(str(e)))
else:
return session
def login(environ, page=None):
session = _getServer(environ)
options = environ['koji.options']
# try SSL first, fall back to Kerberos
if options['WebCert']:
if environ['wsgi.url_scheme'] != 'https':
dest = 'login'
if page:
dest = dest + '?page=' + page
_redirectBack(environ, dest, forceSSL=True)
return
if environ.get('SSL_CLIENT_VERIFY') != 'SUCCESS':
raise koji.AuthError('could not verify client: %s' % environ.get('SSL_CLIENT_VERIFY'))
# use the subject's common name as their username
username = environ.get('SSL_CLIENT_S_DN_CN')
if not username:
raise koji.AuthError('unable to get user information from client certificate')
if not _sslLogin(environ, session, username):
raise koji.AuthError('could not login %s using SSL certificates' % username)
authlogger.info('Successful SSL authentication by %s', username)
elif options['WebPrincipal']:
principal = environ.get('REMOTE_USER')
if not principal:
raise koji.AuthError('configuration error: mod_auth_gssapi should have performed authentication before presenting this page')