Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Generate a random token
user_token = codecs.encode(os.urandom(32), 'hex').decode('utf-8')
# Encrypt the email with the token
encrypted_email = encrypt(email, user_token)
# Generate a user ID from the token
hmac_secret = request.registry.settings['userid_hmac_secret']
userID = utils.hmac_digest(hmac_secret, user_token)
# Store the encrypted user ID with the token
session_ttl = portier_conf(request, 'session_ttl_seconds')
request.registry.cache.set('portier:{}'.format(userID), encrypted_email, session_ttl)
location = '%s%s' % (stored_redirect, user_token)
return httpexceptions.HTTPFound(location=location)
form = Form(request,
schema=StartSessionForm())
if form.validate():
identifier = form.data["identifier"]
secret = form.data["secret"]
account_name = form.data["account_name"]
db = request.db
session = AuthSession.create(db, identifier, secret, account_name)
if session is None:
raise exc.HTTPForbidden()
else:
log.debug("new auth session for client %s username %s",
identifier, account_name)
return {"auth_token": session.token}
else:
raise exc.HTTPForbidden()
def badge(request):
"""Return the number of public annotations on a given page.
This is for the number that's displayed on the Chrome extension's badge.
Certain pages are blocklisted so that the badge never shows a number on
those pages. The Chrome extension is oblivious to this, we just tell it
that there are 0 annotations.
"""
uri = request.params.get("uri")
if not uri:
raise httpexceptions.HTTPBadRequest()
# Do a cheap check to see if this URI has ever been annotated. If not,
# and most haven't, then we can skip the costs of a blocklist lookup or
# search request. In addition to the Elasticsearch query, the search request
# involves several DB queries to expand URIs and enumerate group IDs
# readable by the current user.
if not _has_uri_ever_been_annotated(request.db, uri):
count = 0
elif models.Blocklist.is_blocked(request.db, uri):
count = 0
else:
query = MultiDict({"uri": uri, "limit": 0})
s = search.Search(request, stats=request.stats)
result = s.run(query)
count = result.total
def add_report(request):
"""
Add an abuse report to the database
:param request: a request object
:return: a redirect to the abuse reports page
"""
if request.authenticated_userid is None:
raise exc.HTTPNotFound()
reporter = request.authenticated_user
if reporter is None:
raise exc.HTTPNotFound()
public_language_id = request.matchdict["public_language_id"]
page_url = urllib.unquote(urllib.unquote(request.matchdict["page_uri"]))
public_group_id = request.matchdict['public_group_id']
user_id = request.matchdict['user_id']
page = annotran.pages.models.Page.get_by_uri(page_url)
author = h.models.User.get_by_username(user_id)
reporter = h.models.User.get_by_username(request.authenticated_user.username)
group = h.groups.models.Group.get_by_pubid(public_group_id)
language = annotran.languages.models.Language.get_by_public_language_id(public_language_id)
query_args = urlencode({
'login_hint': request.validated['email'],
'scope': portier_conf(request, 'requested_scope'),
'nonce': nonce,
'response_type': 'id_token',
'response_mode': 'form_post',
# Get the data from the config because the request might only
# have local network information and not the public facing ones.
'client_id': '{scheme}://{host}'.format(scheme=request.registry.settings['http_scheme'],
host=request.registry.settings['http_host']),
'redirect_uri': request.route_url(verify.name),
})
location = form_url.format(broker_uri=broker_uri, query_args=query_args)
return httpexceptions.HTTPFound(location=location)
def historical_maps(request):
release_year = request.params.get('release_year')
if release_year is None:
raise exc.HTTPBadRequest('Please provide a parameter realease_year')
bildnummer = request.params.get('bildnummer')
if bildnummer is None:
raise exc.HTTPBadRequest('Please provide a map number (param: bildnummer)')
# In order to use the validator and the associated service
request.matchdict['map'] = 'api'
layerId = request.params.get('layer')
request.matchdict['layerId'] = layerId
featureId = bildnummer + '_' + release_year
request.matchdict['featureId'] = featureId
# Call service directly
params = ExtendedHtmlPopupServiceValidation(request)
feature, vectorModel = next(_get_features(params))
featureBbox = parse_box2d(feature['feature'].properties['box2d'])
center = center_from_box2d(featureBbox)
def badge_add(request):
uri = request.params["add"]
item = models.Blocklist(uri=uri)
request.db.add(item)
# There's a uniqueness constraint on `uri`, so we flush the session,
# catching any IntegrityError and responding appropriately.
try:
request.db.flush()
except IntegrityError:
request.db.rollback()
msg = _("{uri} is already blocked.").format(uri=uri)
request.session.flash(msg, "error")
index = request.route_path("admin.badge")
return httpexceptions.HTTPSeeOther(location=index)
def wrapper(request, *args, user=None, **kwargs):
"""Redirect user."""
if request.feature("new_oauth"):
return view_function(request, *args, user=user, **kwargs)
if oauth_condition(request) is False:
return view_function(request, *args, user=user, **kwargs)
ai_getter = request.find_service(name="ai_getter")
consumer_key = request.params["oauth_consumer_key"]
try:
client_id = ai_getter.developer_key(consumer_key)
lms_url = ai_getter.lms_url(consumer_key)
except ConsumerKeyError:
raise exc.HTTPInternalServerError()
authorization_base_url = build_auth_base_url(
lms_url, authorization_base_endpoint
)
redirect_uri = build_redirect_uri(request.url, redirect_endpoint)
oauth_session = requests_oauthlib.OAuth2Session(
client_id, redirect_uri=redirect_uri
)
authorization_url, state_guid = oauth_session.authorization_url(
authorization_base_url
)
oauth_state = find_or_create_from_user(
request.db, state_guid, user, request.params
def add_language(request):
"""
This view adds a language
:param request: a request object
:return: None
"""
if request.authenticated_userid is None:
raise exc.HTTPNotFound()
name = request.matchdict["language"]
public_group_id = request.matchdict["public_group_id"]
language = models.Language.get_by_name(name)
if not language:
language = models.Language(name=name)
request.db.add(language)
# We need to flush the db session here so that language.id will be generated.
request.db.flush()
url = request.route_url('translation_read', public_language_id=language.pubid, public_group_id=public_group_id)
return exc.HTTPSeeOther(url)
@view.view_config(context=httpexceptions.HTTPServerError)
def httperror(self):
self.request.response.status_int = self.exc.status_int
# If code raises an HTTPError or HTTPServerError we assume this was
# deliberately raised and:
# 1. Show the user an error page including specific error message
# 2. _Do not_ report the error to Sentry.
return {"message": str(self.exc)}