Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _register_name_unclear_error(self):
payload = json.dumps({"status": 409, "code": "unexistent_error_code"}).encode()
response_code = 409
content_type = "application/json"
return response.Response(
payload, response_code, [("Content-Type", content_type)]
)
def index(request):
assert request.json == data
assert request.text == json.dumps(data)
assert not request.POST
capture_message("hi")
return Response("ok")
def change_status(request):
topic_id = request.matchdict.get('topic_id')
status = request.matchdict.get('status')
default_status = ['publish', 'suspend', 'delete']
if status not in default_status:
return Response('This status not allow', status='500')
topic = models.Topic.objects.with_id(topic_id)
if topic is None:
return Response('Not Found, topic title:%s'%topic_id, status='404 Not Found')
topic.status = status
topic.save()
if status == 'delete':
return HTTPFound(location=request.route_path('manager.topics.problem'))
return HTTPFound(location=request.route_path('forums.topics.index'))
def update_auto(request: Request):
car_id = request.matchdict.get('car_id')
car = Repository.car_by_id(car_id)
if car_id == '__first__':
car_id = Repository.all_cars()[0].id
if not car:
msg = "The car with id '{}' was not found.".format(car_id)
return Response(status=404, json_body={'error': msg})
try:
car_data = request.json_body
except:
return Response(status=400, body='Could not parse your post as JSON.')
vm = UpdateAutoViewModel(car_data, car_id)
vm.compute_details()
if vm.errors:
return Response(status=400, body=vm.error_msg)
try:
Repository.update_car(vm.car)
return Response(status=204, body='Car updated successfully.')
except:
return Response(status=400, body='Could not update car.')
setting_disable_check = request.env.core.settings.get(sett_name, 'false').lower()
if setting_disable_check not in ('true', 'yes', '1'):
request.resource_permission(PD_READ)
z = int(request.GET['z'])
x = int(request.GET['x'])
y = int(request.GET['y'])
req = obj.render_request(obj.srs)
img = req.render_tile((z, x, y), 256)
buf = StringIO()
img.save(buf, 'png')
buf.seek(0)
return Response(body_file=buf, content_type='image/png')
"""
try:
inbound = request.body
inbound_json = json.loads(inbound)
key = inbound_json["header"]["jwk"]
body = json.dumps(
{
"key": key,
"status": "good",
"contact": ["mailto:cert-admin@example.com", "tel:+12025551212"],
},
sort_keys=True,
)
except:
raise ValueError("invalid input")
return Response(
body=body,
status_code=201,
headers={
"Link": ';rel="terms-of-service"',
"Date": datetime.datetime.utcnow().isoformat(),
},
from sqlalchemy.sql.functions import func
mimetype, create_date, file_identity = File.default_db.query(
File.mime_type, File.creation_date, file.file_identity
).filter_by(id=int(document_id)).first()
size = path.getsize(File.path_of(file_identity))
return Response(
body=None, content_type=str(mimetype),
content_length=size, last_modified=create_date)
try:
result = requests.head(url, timeout=15)
except requests.ConnectionError:
return Response(
status=503,
location=url)
return Response(
content_type=result.headers.get('Content-Type', None),
status=result.status_code,
location=result.url)
self.content = list(app_iter)
self.index = 0
def __iter__(self):
return self
def __next__(self):
try:
return self.content[self.index]
except IndexError:
raise StopIteration
finally:
self.index += 1
class SwitchProtocolsResponse(Response):
"""Upgrade from a WSGI connection with the WebSocket handshake."""
def __init__(self, environ, switch_protocols):
super().__init__()
self.status_int = 101
http_1_1 = environ['SERVER_PROTOCOL'] == 'HTTP/1.1'
def get_header(k):
key_map = {k.upper(): k for k in environ}
return environ[key_map['HTTP_' + k.upper().replace('-', '_')]]
key = websockets.handshake.check_request(get_header)
if not http_1_1 or key is None:
self.status_int = 400
}
page.append(define.render('user/favorites.html', [
# Profile information
userprofile,
# User information
profile.select_userinfo(otherid, config=userprofile['config']),
# Relationship
profile.select_relation(request.userid, otherid),
# Feature
form.feature,
# Favorites
faves,
]))
return Response(define.common_page_end(request.userid, page))
if r.raw.closed and r.next:
body = r.next.body
else:
body = r.raw.read()
if r.status_code < 400:
commit_serial = int(r.headers["X-DEVPI-SERIAL"])
xom.keyfs.wait_tx_serial(commit_serial)
headers = clean_response_headers(r)
headers[str("X-DEVPI-PROXY")] = str("replica")
if r.status_code == 302: # REDIRECT
# rewrite master-related location to our replica site
master_location = r.headers["location"]
outside_url = request.application_url
headers[str("location")] = str(
master_location.replace(xom.config.master_url.url, outside_url))
return Response(status="%s %s" %(r.status_code, r.reason),
body=body,
headers=headers)