Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
env.update({
'REQUEST_METHOD': 'GET',
'HTTP_IF_NONE_MATCH': response.get_etag()[0]
})
response.make_conditional(env)
assert 'date' in response.headers
# after the thing is invoked by the server as wsgi application
# (we're emulating this here), there must not be any entity
# headers left and the status code would have to be 304
resp = wrappers.Response.from_app(response, env)
assert resp.status_code == 304
assert not 'content-length' in resp.headers
# make sure date is not overriden
response = wrappers.Response('Hello World')
response.date = 1337
d = response.date
response.make_conditional(env)
assert response.date == d
# make sure content length is only set if missing
response = wrappers.Response('Hello World')
response.content_length = 999
response.make_conditional(env)
self.assert_equal(response.content_length, 999)
def stats(request):
copy_stream(request)
f1 = request.files["file1"]
f2 = request.files["file2"]
text = request.form["text"]
f1.save(request.stat_folder + "/file1.bin")
f2.save(request.stat_folder + "/file2.bin")
with open(request.stat_folder + "/text.txt", "w") as f:
f.write(text.encode("utf-8"))
return Response("Done.")
def test_range_request_with_file():
env = create_environ()
resources = os.path.join(os.path.dirname(__file__), "res")
fname = os.path.join(resources, "test.txt")
with open(fname, "rb") as f:
fcontent = f.read()
with open(fname, "rb") as f:
response = wrappers.Response(wrap_file(env, f))
env["HTTP_RANGE"] = "bytes=0-0"
response.make_conditional(
env, accept_ranges=True, complete_length=len(fcontent)
)
assert response.status_code == 206
assert response.headers["Accept-Ranges"] == "bytes"
assert response.headers["Content-Range"] == "bytes 0-0/%d" % len(fcontent)
assert response.headers["Content-Length"] == "1"
assert response.data == fcontent[:1]
service = environ['PATH_INFO'][len('/xmlrpc/'):]
if environ['PATH_INFO'].startswith('/xmlrpc/2/'):
service = service[len('2/'):]
string_faultcode = False
params, method = xmlrpclib.loads(data)
try:
result = flectra.http.dispatch_rpc(service, method, params)
response = xmlrpclib.dumps((result,), methodresponse=1, allow_none=False)
except Exception as e:
if string_faultcode:
response = xmlrpc_handle_exception_string(e)
else:
response = xmlrpc_handle_exception_int(e)
return werkzeug.wrappers.Response(
response=response,
mimetype='text/xml',
)(environ, start_response)
def getWay(self, request, id):
api = OsmApi()
data = api.getWayById(long(id))
outputter = OsmXmlOutput()
return Response(outputter.iter(data), content_type='text/xml')
def storage(app, environ, request, version, uid):
if request.method == 'DELETE':
if request.headers.get('X-Confirm-Delete', '0') == '1':
app.initialize(uid, request.authorization.password)
return Response(json.dumps(time.time()), 200)
return Response('Precondition Failed', 412)
def _dispatch_jsonrpc_request(self, request):
json_output = self.registry.dispatch(request)
if json_output is None:
return Response()
return Response(json_output, mimetype="application/json")
def __call__(self, request):
dbname = random.choice(self.dbnames)
conn_info = dict(self.conn_info_no_dbname, dbname=dbname)
conn = psycopg2.connect(**conn_info)
conn.set_session(readonly=True, autocommit=True)
try:
cursor = conn.cursor()
cursor.execute('select 1')
records = cursor.fetchall()
assert len(records) == 1
assert len(records[0]) == 1
assert records[0][0] == 1
finally:
conn.close()
return Response('OK', mimetype='text/plain')
if not action:
action = ServerActions.search([('website_path', '=', path_or_xml_id_or_id), ('website_published', '=', True)], limit=1)
if not action:
try:
action_id = int(path_or_xml_id_or_id)
except ValueError:
pass
# check it effectively exists
if action_id:
action = ServerActions.browse(action_id).exists()
# run it, return only if we got a Response object
if action:
if action.state == 'code' and action.website_published:
action_res = action.run()
if isinstance(action_res, werkzeug.wrappers.Response):
return action_res
return request.redirect('/')
def view_method(*args, **kwargs):
response_val = f(*args, **kwargs)
if isinstance(response_val, werkzeug.wrappers.Response):
return response_val
if isinstance(response_val, flask.Response):
return response_val
if isinstance(response_val, dict):
model = dict(response_val)
else:
model = dict()
if template_file and not isinstance(response_val, dict):
raise Exception(
"Invalid return type {}, we expected a dict as the return value.".format(type(response_val)))
if template_file:
response_val = flask.render_template(template_file, **response_val)