Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with pytest.raises(werkzeug.exceptions.Forbidden):
match_webhook_secret(request_ctx)
request_ctx.headers = {'X-Hub-Signature': None}
with pytest.raises(werkzeug.exceptions.Forbidden):
match_webhook_secret(request_ctx)
key, data = 'testkey', 'testdata'
hmac_obj = hmac.new(key.encode(),
data.encode())
request_ctx.headers = {
'X-Hub-Signature': f'{hmac_obj.name}={hmac_obj.hexdigest()}'
}
with pytest.raises(werkzeug.exceptions.NotImplemented):
match_webhook_secret(request_ctx)
hmac_obj = hmac.new(key.encode(),
data.encode(),
digestmod="sha1")
request_ctx.headers = {
'X-Hub-Signature': f'sha1={hmac_obj.hexdigest()}'
}
request_ctx.data = data.encode()
monkeypatch.setenv('GITHUB_PAYLOAD_SECRET', 'wrongkey')
with pytest.raises(werkzeug.exceptions.Forbidden):
match_webhook_secret(request_ctx)
monkeypatch.setenv('GITHUB_PAYLOAD_SECRET', key)
def test_proxy_exception():
orig_resp = Response("Hello World")
with pytest.raises(exceptions.HTTPException) as excinfo:
exceptions.abort(orig_resp)
resp = excinfo.value.get_response({})
assert resp is orig_resp
assert resp.get_data() == b"Hello World"
# In the ideal world only JavaScript can be used to add a custom header, and
# only within its origin. By default, browsers don't allow JavaScript to
# make cross origin requests.
#
# Unfortunately, in the real world due to bugs in browsers plugins, it can't
# be guaranteed that a page won't set an HTTP request with a custom header
# set. That's why we also check the contents of a header via an HMAC check
# with a server-stored secret.
#
# See for more details:
# https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet
# (Protecting REST Services: Use of Custom Request Headers).
csrf_token = utils.SmartStr(request.headers.get("X-CSRFToken", ""))
if not csrf_token:
logging.info("Did not find headers CSRF token for: %s", request.path)
raise werkzeug_exceptions.Forbidden("CSRF token is missing")
try:
decoded = base64.urlsafe_b64decode(csrf_token + "==")
digest, token_time = decoded.rsplit(CSRF_DELIMITER, 1)
token_time = long(token_time)
except (TypeError, ValueError):
logging.info("Malformed CSRF token for: %s", request.path)
raise werkzeug_exceptions.Forbidden("Malformed CSRF token")
if len(digest) != hashlib.sha256().digest_size:
logging.info("Invalid digest size for: %s", request.path)
raise werkzeug_exceptions.Forbidden("Malformed CSRF token digest")
expected = GenerateCSRFToken(request.user, token_time)
if not constant_time.bytes_eq(csrf_token, expected):
logging.info("Non-matching CSRF token for: %s", request.path)
akw[key] = value
else:
akw[key] = type(value)
try:
r = self._call_function(**self.params)
except werkzeug.exceptions.HTTPException, e:
r = e
except Exception, e:
_logger.exception("An exception occured during an http request")
se = serialize_exception(e)
error = {
'code': 200,
'message': "OpenERP Server Error",
'data': se
}
r = werkzeug.exceptions.InternalServerError(cgi.escape(simplejson.dumps(error)))
else:
if not r:
r = werkzeug.wrappers.Response(status=204) # no content
return r
if not scheduler.delete_job(job_id):
failed += 1
continue
except Exception as e:
error.append(str(e))
pass
if not_found:
error.append('%d job%s not found.' % (not_found, '' if not_found == 1 else 's'))
if failed:
error.append('%d job%s failed to delete.' % (failed, '' if failed == 1 else 's'))
if len(error) > 0:
error = ' '.join(error)
raise werkzeug.exceptions.BadRequest(error)
return 'Jobs deleted.'
if not flask.request.context.is_admin:
scope_key = CONF.collect.scope_key
if filters:
filters[scope_key] = flask.request.context.project_id
else:
filters = {scope_key: flask.request.context.project_id}
results = self._storage.retrieve(
begin=begin, end=end,
filters=filters,
metric_types=metric_types,
offset=offset, limit=limit,
)
if results['total'] < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
return {
'total': results['total'],
'dataframes': results['dataframes'],
}
def pre_post_organizations(request):
user = current_user()
if not user.has_cap('create-organization'):
raise wz_exceptions.Forbidden()
def _parse_urlencoded(self, stream, mimetype, content_length, options):
if self.max_form_memory_size is not None and \
content_length is not None and \
content_length > self.max_form_memory_size:
raise exceptions.RequestEntityTooLarge()
form = url_decode_stream(stream, self.charset,
errors=self.errors, cls=self.cls)
return stream, form, self.cls()
@app.errorhandler(werkzeug.exceptions.BadRequest)
def handle_bad_request(e):
print('Bad Request:', e)
return '', 400
Also allows admins to become another user based on oid or fbid.
Returns a User object if the user is logged in, or None otherwise.
"""
req = flask.request
if hasattr(req, 'current_user'):
return req.current_user
api_key = req.values.get('api_key')
if api_key and is_api_request():
req.current_user = m.User.objects(api_key=api_key).first()
if not req.current_user:
# TODO(mack): change exceptions to not return html, but just the
# error text
raise exceptions.ImATeapot('Invalid api key %s' % api_key)
elif SESSION_COOKIE_KEY_USER_ID in flask.session:
user_id = flask.session[SESSION_COOKIE_KEY_USER_ID]
req.current_user = m.User.objects.with_id(user_id)
# req.current_user can be None if the client still has a cookie with
# this user_id, but the corresponding User's account has been deleted.
if req.current_user:
req.current_user.update(set__last_visited=datetime.datetime.now())
else:
req.current_user = None
if req.current_user and req.current_user.is_admin:
oid = req.values.get('as_oid', '')
fbid = req.values.get('as_fbid', '')
if oid:
try: