Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
add_header(response)
s = request.environ.get('beaker.session')
# Accepts standard http auth
auth = parse_auth(request.get_header('Authorization', ''))
if 'session' in request.POST or 'session' in request.GET:
# removes "' so it works on json strings
s = s.get_by_id(remove_chars(request.params.get('session'), "'\""))
elif auth:
user = PYLOAD.checkAuth(auth[0], auth[1], request.environ.get('REMOTE_ADDR', None))
# if auth is correct create a pseudo session
if user: s = {'uid': user.uid}
api = get_user_api(s)
if not api:
return HTTPError(401, dumps("Unauthorized"), **response.headers)
if not PYLOAD.isAuthorized(func, api.user):
return HTTPError(403, dumps("Forbidden"), **response.headers)
if not hasattr(PYLOAD.EXTERNAL, func) or func.startswith("_"):
print "Invalid API call", func
return HTTPError(404, dumps("Not Found"), **response.headers)
# TODO: possible encoding
# TODO Better error codes on invalid input
args = [loads(unquote(arg)) for arg in args.split("/")[1:]]
kwargs = {}
# accepts body as json dict
if request.json:
s = s.get_by_id(remove_chars(request.params.get('session'), "'\""))
elif auth:
user = PYLOAD.checkAuth(auth[0], auth[1], request.environ.get('REMOTE_ADDR', None))
# if auth is correct create a pseudo session
if user: s = {'uid': user.uid}
api = get_user_api(s)
if not api:
return HTTPError(401, dumps("Unauthorized"), **response.headers)
if not PYLOAD.isAuthorized(func, api.user):
return HTTPError(403, dumps("Forbidden"), **response.headers)
if not hasattr(PYLOAD.EXTERNAL, func) or func.startswith("_"):
print "Invalid API call", func
return HTTPError(404, dumps("Not Found"), **response.headers)
# TODO: possible encoding
# TODO Better error codes on invalid input
args = [loads(unquote(arg)) for arg in args.split("/")[1:]]
kwargs = {}
# accepts body as json dict
if request.json:
kwargs = request.json
# convert arguments from json to obj separately
for x, y in chain(request.GET.iteritems(), request.POST.iteritems()):
if not x or not y or x == "session": continue
kwargs[x] = loads(unquote(y))
def abort(code=500, text='Unknown Error: Application stopped.'):
""" Aborts execution and causes a HTTP error. """
raise HTTPError(code, text)
def deleteNetwork(self, dbSession, networkId):
try:
networkObject = self.__dao.getObjectById(dbSession, OverlayNetwork, networkId)
logger.info("OverlayNetwork[id='%s', name='%s']: delete request is submitted", networkObject.id, networkObject.name)
self._overlay.deleteNetwork(dbSession, networkObject, self._getForce())
except bottle.HTTPError:
raise
except (exc.NoResultFound) as ex:
logger.debug("No Overlay Network found with Id: '%s', exc.NoResultFound: %s", networkId, ex.message)
raise bottle.HTTPError(404, exception=OverlayNetworkNotFound(networkId))
except Exception as ex:
logger.debug('StackTrace: %s', traceback.format_exc())
raise bottle.HTTPError(500, exception=PlatformError(ex.message))
return bottle.HTTPResponse(status=204)
def getDeviceConfigsInZip(self, dbSession, podId):
pod = self.report.getPod(dbSession, podId)
if pod is None:
raise bottle.HTTPError(404, exception=PodNotFound(podId))
logger.debug('Pod name: %s', pod.name)
zippedConfigFiles = UnderlayRestRoutes.createZipArchive(pod)
if zippedConfigFiles is not None:
bottle.response.headers['Content-Type'] = 'application/zip'
return zippedConfigFiles
else:
raise bottle.HTTPError(404, exception=DeviceConfigurationNotFound("Pod exists but no configs for devices.'%s " % (pod.name)))
breaking out of the ``root`` directory and leaking sensitive information
to an attacker.
Read-protected files or files outside of the ``root`` directory are
answered with ``403 Access Denied``. Missing files result in a
``404 Not Found`` response. Conditional requests (``If-Modified-Since``,
``If-None-Match``) are answered with ``304 Not Modified`` whenever
possible. ``HEAD`` and ``Range`` requests (used by download managers to
check or continue partial downloads) are also handled automatically.
"""
root = os.path.join(os.path.abspath(root), '')
filename = os.path.abspath(os.path.join(root, filename.strip('/\\')))
headers = dict()
if not filename.startswith(root):
return HTTPError(403, "Access denied.")
if not os.path.exists(filename) or not os.path.isfile(filename):
return HTTPError(404, "File does not exist.")
if not os.access(filename, os.R_OK):
return HTTPError(403, "You do not have permission to access this file.")
if mimetype is True:
if download and download is not True:
mimetype, encoding = mimetypes.guess_type(download)
else:
mimetype, encoding = mimetypes.guess_type(filename)
if encoding: headers['Content-Encoding'] = encoding
if mimetype:
if (mimetype[:5] == 'text/' or mimetype == 'application/javascript')\
and charset and 'charset' not in mimetype:
mimetype += '; charset=%s' % charset
address = extract_payload_fields(payload, 'address')[0]
vlan_tag = payload.pop('vlan', 0)
if vlan_tag:
iface = context.service.vlan.allocate_stack(vlan_tag).iface
else:
iface = None
try:
entity = model.IpAddress(address, iface=iface, **payload)
except TypeError as e:
return bottle.HTTPError(400, str(e))
try:
context.service.address.create(entity)
except exc.ServiceCreateCollisionError as e:
return bottle.HTTPError(400, str(e))
bottle.response.status = 201
return address_response(entity)
def gh_merge_do(owner, repo, pr_id):
import github
user = github.user_from_oauth(bottle.request.oauth_token)
pr = github.get_pull_request(owner, repo, pr_id)
if 'login' not in user:
raise bottle.HTTPError(403, 'Could not identify user')
if 'user' not in pr:
raise bottle.HTTPError(403, 'Could not identify PR')
if user['login'] != pr['user']['login']:
raise bottle.HTTPError(403, 'Merge requester is not the PR author')
if pr['merged']:
raise bottle.HTTPError(403, 'PR is already merged')
if not pr['mergeable']:
raise bottle.HTTPError(403, 'PR cannot be merged. Please rebase')
if not github.is_pull_request_buildable(pr):
raise bottle.HTTPError(403, 'PR status not green. Wait or fix errors')
if not github.is_pull_request_self_mergeable(pr):
raise bottle.HTTPError(403, 'Nobody allowed you to merge this PR')
github.merge_pr(pr)
bottle.redirect(pr['html_url'])
def deleteAggregatedL2port(self, dbSession, aggregatedL2portId):
try:
aggregatedL2portObject = self.__dao.getObjectById(dbSession, OverlayAggregatedL2port, aggregatedL2portId)
logger.info("OverlayAggregatedL2port[id='%s', name='%s']: delete request is submitted", OverlayAggregatedL2port.id, OverlayAggregatedL2port.name)
self._overlay.deleteAggregatedL2port(dbSession, aggregatedL2portObject, self._getForce())
except bottle.HTTPError:
raise
except (exc.NoResultFound) as ex:
logger.debug("No Overlay AggregatedL2port found with Id: '%s', exc.NoResultFound: %s", aggregatedL2portId, ex.message)
raise bottle.HTTPError(404, exception=OverlayAggregatedL2portNotFound(aggregatedL2portId))
except Exception as ex:
logger.debug('StackTrace: %s', traceback.format_exc())
raise bottle.HTTPError(500, exception=PlatformError(ex.message))
return bottle.HTTPResponse(status=204)