Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.network_api.disassociate(context, id)
self.network_api.delete(context, id)
except exception.PolicyNotAuthorized as e:
_rollback_quota(reservation)
raise exc.HTTPForbidden(explanation=six.text_type(e))
except exception.NetworkInUse as e:
_rollback_quota(reservation)
raise exc.HTTPConflict(explanation=e.format_message())
except exception.NetworkNotFound:
_rollback_quota(reservation)
msg = _("Network not found")
raise exc.HTTPNotFound(explanation=msg)
if CONF.enable_network_quota and reservation:
QUOTAS.commit(context, reservation)
response = webob.Response(status_int=202)
return response
context = {
"root": req.application_url,
"location": req.path_url,
"breadcrumbs": breadcrumbs,
"directories": directories,
"files": files,
"version": __version__,
}
if catalog:
context['location'] = req.path.replace('catalog.xml', '')
template_name = 'catalog.xml'
response_content_type = 'application/xml'
template = self.env.get_template(template_name)
return Response(
body=template.render(context),
content_type=response_content_type,
charset="utf-8")
def get_certificate_md5(self, lb_id, filename):
self._check_ssl_filename_format(filename)
cert_path = self._cert_file_path(lb_id, filename)
path_exists = os.path.exists(cert_path)
if not path_exists:
return webob.Response(json=dict(
message='Certificate Not Found',
details="No certificate with filename: {f}".format(
f=filename)), status=404)
with open(cert_path, 'r') as crt_file:
cert = crt_file.read()
md5 = hashlib.md5(octavia_utils.b(cert)).hexdigest() # nosec
resp = webob.Response(json=dict(md5sum=md5))
resp.headers['ETag'] = md5
return resp
def process_request(self, request):
if request.method == 'OPTIONS':
response = Response(status='200 OK', body=b'')
response.headerlist += self._prepare_cors_headers()
return response
if not self.verify_auth_token(request):
return self.http_error('403 Forbidden', error='Invalid auth token')
# Clear all the existing state, if any
ctx.__dict__.clear()
ctx.request = request
path = request.path_info
if path in self.mapping:
func = self.mapping[path]
response = func(request)
response.headerlist += self._prepare_cors_headers()
@wrap_error
def get_xsd_atom_contract(self, req, xsd):
resp = Response()
return template.static_file(resp, req, "/xsd/atom/" + xsd,
root=get_app_root(),
mimetype="application/xml")
def make_json_error(ex):
code = ex.code if isinstance(ex, exceptions.HTTPException) else 500
response = webob.Response(json={'error': str(ex), 'http_code': code})
response.status_code = code
return response
def send_result(code, req, result):
content = None
resp = Response()
resp.headers['content-type'] = None
resp.status = code
if code > 399:
return resp
if result:
if is_xml_response(req):
content = result.to_xml()
resp.headers['content-type'] = "application/xml"
else:
content = result.to_json()
resp.headers['content-type'] = "application/json"
resp.content_type_params={'charset' : 'UTF-8'}
resp.unicode_body = content.decode('UTF-8')
def make_response(self, filename):
# Trovo il tipo di file e l'encoding usato nella pagina web
filetype, encoding = mimetypes.guess_type(filename)
# Se il tipo di file non e' trovato, allora gli assegno come tipo di file il piu' generico possibile
if filetype is None:
filetype = 'application/octet-stream'
# Response e' una classe che contiene appunto la risposta di un server a una richiesta HTTP
res = Response(content_type=filetype)
# Assegno al body della risposta cio' che riesco a leggere dalla pagina web
res.body = open(filename, 'rb').read()
return res
def __call__(self, req):
"""Respond to a request for all Sahara API versions."""
path = req.environ['PATH_INFO']
if re.match(r"^/*$", path):
response = webob.Response(request=req, status=300,
content_type="application/json")
response.body = jsonutils.dumps(self._get_versions(req))
return response
else:
return self.application
def _access_module(self, switchid, func, waiters=None):
try:
dps = self._OFS_LIST.get_ofs(switchid)
except ValueError, message:
return Response(status=400, body=str(message))
msgs = []
for f_ofs in dps.values():
function = getattr(f_ofs, func)
msg = function() if waiters is None else function(waiters)
msgs.append(msg)
body = json.dumps(msgs)
return Response(content_type='application/json', body=body)