Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def promote_user_to_admin(client, user_info):
"""Assuming user_info is the info for the logged-in user, promote them
to admin and leave them logged in.
"""
log_out_current_user(client)
admin = User.get(fn.Lower(User.name) == user_info['username'])
UserMetadata.create(uid=admin.uid, key='admin', value='1')
log_in_user(client, user_info)
def get_global_stylesheet(self):
if self.subtheme:
try:
css = SubStylesheet.select().join(Sub).where(fn.Lower(Sub.name) == self.subtheme.lower()).get()
except SubStylesheet.DoesNotExist:
return ''
return css.content
return ''
def sub_new_rss(sub):
""" RSS feed for /sub/new """
try:
sub = Sub.get(fn.Lower(Sub.name) == sub.lower())
except Sub.DoesNotExist:
abort(404)
fg = FeedGenerator()
fg.id(request.url)
fg.title('New posts from ' + sub.name)
fg.link(href=request.url_root, rel='alternate')
fg.link(href=request.url, rel='self')
posts = misc.getPostList(misc.postListQueryBase(noAllFilter=True).where(Sub.sid == sub.sid), 'new', 1).dicts()
return Response(misc.populate_feed(fg, posts).atom_str(pretty=True), mimetype='application/atom+xml')
def resend_confirmation_email():
if current_user.is_authenticated:
return redirect(url_for('home.index'))
form = ResendConfirmationForm()
if not form.validate():
return engine.get_template('user/resend_confirmation.html').render(
{'form': form,
'error': misc.get_errors(form, True)})
try:
email = normalize_email(form.email.data)
user = User.get(fn.Lower(User.email) == email.lower())
if user.status == UserStatus.PROBATION:
send_login_link_email(user)
return redirect(url_for('auth.confirm_registration'))
elif user.status == UserStatus.OK:
flash(_("Your email is already confirmed."), 'message')
return redirect(url_for('auth.login'))
except User.DoesNotExist:
pass
return redirect(url_for('user.recovery_email_sent'))
def get_user_by_email(self, email):
try:
return User.get(fn.Lower(User.email) == email.lower())
except User.DoesNotExist:
try:
um = UserMetadata.get((UserMetadata.key == 'pending_email') &
(fn.Lower(UserMetadata.value) == email.lower()))
return User.get(User.uid == um.uid)
except UserMetadata.DoesNotExist:
pass
if self.provider == 'KEYCLOAK':
users = self.keycloak_admin.get_users({"email": email})
for userdict in users:
try:
return User.get(User.name == userdict['username'])
except User.DoesNotExist:
return None
return None
def edit_post(sub, pid):
uid = get_jwt_identity()
content = request.json.get('content', None)
if not content:
return jsonify(msg="Content parameter required"), 400
if len(content) > 16384:
return jsonify(msg="Content is too long"), 400
try:
post = SubPost.select().join(Sub, JOIN.LEFT_OUTER).where(
(SubPost.pid == pid) & (fn.Lower(Sub.name) == sub.lower()))
post = post.where(SubPost.deleted == 0).get()
except SubPost.DoesNotExist:
return jsonify(msg="Post does not exist"), 404
if post.uid_id != uid:
return jsonify(msg="Unauthorized"), 403
if misc.is_sub_banned(sub, uid=uid):
return jsonify(msg='You are banned on this sub.'), 403
if (datetime.datetime.utcnow() - post.posted.replace(tzinfo=None)) > datetime.timedelta(days=config.site.archive_post_after):
return jsonify(msg='Post is archived'), 403
post.content = content
# Only save edited time if it was posted more than five minutes ago
if (datetime.datetime.utcnow() - post.posted.replace(tzinfo=None)).seconds > 300:
def post_voting(page, term):
""" WIP: View post voting habits """
if current_user.is_admin():
try:
user = User.get(fn.Lower(User.name) == term.lower())
msg = []
votes = SubPostVote.select(SubPostVote.positive, SubPostVote.pid, User.name, SubPostVote.datetime,
SubPostVote.pid)
votes = votes.join(SubPost, JOIN.LEFT_OUTER, on=SubPost.pid == SubPostVote.pid)
votes = votes.switch(SubPost).join(User, JOIN.LEFT_OUTER, on=SubPost.uid == User.uid)
votes = votes.where(SubPostVote.uid == user.uid).dicts()
except User.DoesNotExist:
votes = []
msg = 'user not found'
return render_template('admin/postvoting.html', page=page, msg=msg,
admin_route='admin.post_voting',
votes=votes, term=term)
else:
abort(404)
def create_rule(sub):
""" Creates a new rule (from edit rule page) """
try:
sub = Sub.get(fn.Lower(Sub.name) == sub.lower())
except Sub.DoesNotExist:
abort(404)
if not current_user.is_mod(sub.sid, 1) and not current_user.is_admin():
abort(403)
form = CreateSubRule()
if form.validate():
allowed_rules = re.compile("^[a-zA-Z0-9._ -]+$")
if not allowed_rules.match(form.text.data):
return jsonify(status='error', error=[_('Rule has invalid characters')])
SubRule.create(sid=sub.sid, text=form.text.data)
return jsonify(status='ok')
return json.dumps({'status': 'error', 'error': get_errors(form)})
def queryJid(self, jid):
jid = jid.lower()
jids = UserHistory.select().where(
(pw.fn.Lower(pw.fn.Substr(UserHistory.jid, 1, len(jid))) == jid)
)
jids = list(jids)
if len(jids) > 0:
instance = jids[0]
others = dict(selStart=len(jid), selEnd=len(instance.jid))
self._obj = peeweeWrapper(jids[0], others)
return self._obj
return QtCore.QVariant()
def create_flair(sub):
""" Creates a new flair (from edit flair page) """
try:
sub = Sub.get(fn.Lower(Sub.name) == sub.lower())
except Sub.DoesNotExist:
abort(404)
if not current_user.is_mod(sub.sid, 1) and not current_user.is_admin():
abort(403)
form = CreateSubFlair()
if form.validate():
allowed_flairs = re.compile("^[a-zA-Z0-9._ -]+$")
if not allowed_flairs.match(form.text.data):
return jsonify(status='error', error=[_('Flair has invalid characters')])
SubFlair.create(sid=sub.sid, text=form.text.data)
return jsonify(status='ok')
return json.dumps({'status': 'error', 'error': get_errors(form)})