Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def metrics():
bottle.response.content_type = prom.CONTENT_TYPE_LATEST
return prom.generate_latest(prom.REGISTRY)
def getDeviceConfigsInZip(self, dbSession, podId):
pod = self.report.getPod(dbSession, podId)
if pod is None:
raise bottle.HTTPError(404, exception=PodNotFound(podId))
logger.debug('Pod name: %s', pod.name)
zippedConfigFiles = UnderlayRestRoutes.createZipArchive(pod)
if zippedConfigFiles is not None:
bottle.response.headers['Content-Type'] = 'application/zip'
return zippedConfigFiles
else:
raise bottle.HTTPError(404, exception=DeviceConfigurationNotFound("Pod exists but no configs for devices.'%s " % (pod.name)))
def wrapper(*args, **kwargs):
log = open('/var/log/ewsput.txt', 'a')
log.write('%s %s %s %s %s \n' % (request.remote_addr, datetime.now().strftime('%H:%M'),
request.method, request.url, response.status))
log.close()
req = func(*args, **kwargs)
return req
return wrapper
@wraps(f)
def json_dumps(*args, **kwargs):
r = f(*args, **kwargs)
response.content_type = "application/json; charset=UTF-8"
if r and type(r) in (dict, list, tuple):
return dumps(r)
if r and type(r) is str:
return r
return json_dumps
def set_theme(name):
"""Set the name of theme in cookie.
:name: theme's name
"""
bottle.response.set_cookie("theme", name, path='/')
def _func(ds_id=None):
try:
params = body_to_json(bottle.request)
logger.info(f'Received {request_name} request: {params}')
ds_man = _create_dataset_manager(DB())
res = handler(ds_man, ds_id, params)
return {'status': OK['status'], 'ds_id': ds_id or res.get('ds_id', None)}
except UnknownDSID as e:
logger.warning(e)
bottle.response.status = NOT_EXIST['status_code']
return {'status': NOT_EXIST['status'], 'ds_id': ds_id}
except DSIsBusy as e:
logger.warning(e)
bottle.response.status = BUSY['status_code']
return {'status': BUSY['status'], 'ds_id': ds_id}
except Exception as e:
logger.exception(e)
bottle.response.status = INTERNAL_ERROR['status_code']
return {'status': INTERNAL_ERROR['status'], 'ds_id': ds_id}
def logout_post():
"""Clear session cookies."""
response.delete_cookie('email')
return redirect('/')
def wrapper(*a, **ka):
''' Encapsulate the result in json '''
output = callback(*a, **ka)
if self.in_supported_types(request.headers.get('Accept', '')):
response_object = self.get_response_object(0)
response_object['data'] = output
json_response = json_dumps(response_object)
response.content_type = 'application/json'
return json_response
else:
return output
return wrapper
try:
params['q'] = modify_query(params, rp_db_map)
except Exception as e:
print "EXC:", e
pass
headers = request.headers
cookies = request.cookies
r = requests.get(url=forward_url +'/'+ path, params=params, headers=headers, cookies=cookies, stream=True) # get data from influx
if r.status_code == 200:
for key, value in dict(r.headers).iteritems():
response.set_header(key, value)
for key, value in dict(r.cookies).iteritems():
response.cookies[key] = value
pass
else:
abort(r.status_code, r.reason) # NOK, return error
return r.raw
def _CaptureException(f, *args, **kwargs):
"""Decorator implementation for capturing exceptions."""
try:
r = f(*args, **kwargs)
except HTTPResponse:
raise # redirect() uses exceptions
except Exception as e:
r = capture_return_exception(e)
if hasattr(e, 'code'):
response.status = e.code
return r