Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_header_set_duplication_bug():
headers = Headers([("Content-Type", "text/html"), ("Foo", "bar"), ("Blub", "blah")])
headers["blub"] = "hehe"
headers["blafasel"] = "humm"
assert headers == Headers(
[
("Content-Type", "text/html"),
("Foo", "bar"),
("blub", "hehe"),
("blafasel", "humm"),
]
def test_request_handler_returns_process_stdout_when_making_response(self, lambda_output_parser_mock, request_mock):
make_response_mock = Mock()
request_mock.return_value = ("test", "test")
self.service.service_response = make_response_mock
self.service._get_current_route = MagicMock()
self.service._get_current_route.methods = []
self.service._construct_event = Mock()
parse_output_mock = Mock()
parse_output_mock.return_value = ("status_code", Headers({"headers": "headers"}), "body")
self.service._parse_lambda_output = parse_output_mock
lambda_logs = "logs"
lambda_response = "response"
is_customer_error = False
lambda_output_parser_mock.get_lambda_output.return_value = lambda_response, lambda_logs, is_customer_error
service_response_mock = Mock()
service_response_mock.return_value = make_response_mock
self.service.service_response = service_response_mock
result = self.service._request_handler()
self.assertEqual(result, make_response_mock)
lambda_output_parser_mock.get_lambda_output.assert_called_with(ANY)
# Make sure the parse method is called only on the returned response and not on the raw data from stdout
.. versionchanged:: 0.6
Previously that function was called `fix_headers` and modified
the response object in place. Also since 0.6, IRIs in location
and content-location headers are handled properly.
Also starting with 0.6, Werkzeug will attempt to set the content
length if it is able to figure it out on its own. This is the
case if all the strings in the response iterable are already
encoded and the iterable is buffered.
:param environ: the WSGI environment of the request.
:return: returns a new :class:`~werkzeug.datastructures.Headers`
object.
"""
headers = Headers(self.headers)
location = None
content_location = None
content_length = None
status = self.status_code
# iterate over the headers to find all values in one go. Because
# get_wsgi_headers is used each response that gives us a tiny
# speedup.
for key, value in headers:
ikey = key.lower()
if ikey == u'location':
location = value
elif ikey == u'content-location':
content_location = value
elif ikey == u'content-length':
content_length = value
def test_user_status_requests_2(self):
"""Resource list with 2 finished, 1 terminated and 2 error resources
"""
# Create a random test user id that are used for login as admin
user_id = "heinz" + str(randint(0, 10000000))
user_group = self.user_group
password = "1234"
# We need to create an HTML basic authorization header
auth_header = Headers()
auth = bytes('%s:%s' % (user_id, password), "utf-8")
auth_header.add('Authorization', 'Basic ' + base64.b64encode(auth).decode())
# Make sure the user database is empty
user = ActiniaUser(user_id)
if user.exists():
user.delete()
# Create a user in the database and reduce its credentials
self.user = ActiniaUser.create_user(user_id,
user_group,
password,
user_role="admin",
accessible_datasets={"nc_spm_08": ["PERMANENT",
"user1",
"landsat",
"test_mapset"],
mtime = os.path.getmtime(filepath_or_fp)
else:
file = filepath_or_fp
if not filename:
filename = getattr(file, 'name', None)
file.seek(0, 2)
size = file.tell()
file.seek(0)
if mimetype is None and filename:
mimetype = mimetypes.guess_type(filename)[0]
if mimetype is None:
mimetype = 'application/octet-stream'
headers = werkzeug.datastructures.Headers()
if as_attachment:
if filename is None:
raise TypeError('filename unavailable, required for sending as attachment')
headers.add('Content-Disposition', 'attachment', filename=filename)
headers['Content-Length'] = size
data = wrap_file(request.httprequest.environ, file)
rv = Response(data, mimetype=mimetype, headers=headers,
direct_passthrough=True)
if isinstance(mtime, str):
try:
server_format = openerp.tools.misc.DEFAULT_SERVER_DATETIME_FORMAT
mtime = datetime.datetime.strptime(mtime.split('.')[0], server_format)
except Exception:
mtime = None
sorted(random.sample(list(range(ITEMS_PER_ITERATION)), mix_per_iteration))))
for count in range(total_entries):
if not count % ITEMS_PER_ITERATION and make_cocktail:
mix_indices = generate_mix_indices()
iteration_count += 1
if count in mix_indices:
obj_path = next(pool).strip()
else:
obj_path = base_list.pop(0).strip()
yield _get_object_element(object_path=obj_path) + '\n'
yield '\n'
headers = Headers([('Content-Type', 'text/xml')])
return Response(stream_with_context(generate()),
status="200 OK",
headers=headers)
.. versionchanged:: 0.6
Previously that function was called `fix_headers` and modified
the response object in place. Also since 0.6, IRIs in location
and content-location headers are handled properly.
Also starting with 0.6, Werkzeug will attempt to set the content
length if it is able to figure it out on its own. This is the
case if all the strings in the response iterable are already
encoded and the iterable is buffered.
:param environ: the WSGI environment of the request.
:return: returns a new :class:`~werkzeug.datastructures.Headers`
object.
"""
headers = Headers(self.headers)
location = None
content_location = None
content_length = None
status = self.status_code
# iterate over the headers to find all values in one go. Because
# get_wsgi_headers is used each response that gives us a tiny
# speedup.
for key, value in headers:
ikey = key.lower()
if ikey == 'location':
location = value
elif ikey == 'content-location':
content_location = value
elif ikey == 'content-length':
content_length = value
if add_etags:
warn(DeprecationWarning('In future flask releases etags will no '
'longer be generated for file objects passed to the send_file '
'function because this behavior was unreliable. Pass '
'filenames instead if possible, otherwise attach an etag '
'yourself based on another value'), stacklevel=2)
if filename is not None:
if not os.path.isabs(filename):
filename = os.path.join(current_app.root_path, filename)
if mimetype is None and (filename or attachment_filename):
mimetype = mimetypes.guess_type(filename or attachment_filename)[0]
if mimetype is None:
mimetype = 'application/octet-stream'
headers = Headers()
if as_attachment:
if attachment_filename is None:
if filename is None:
raise TypeError('filename unavailable, required for '
'sending as attachment')
attachment_filename = os.path.basename(filename)
headers.add('Content-Disposition', 'attachment',
filename=attachment_filename)
if current_app.use_x_sendfile and filename:
if file is not None:
file.close()
headers['X-Sendfile'] = filename
headers['Content-Length'] = os.path.getsize(filename)
data = None
else:
to zero here for certain status codes.
.. versionchanged:: 0.6
Previously that function was called `fix_headers` and modified
the response object in place. Also since 0.6, IRIs in location
and content-location headers are handled properly.
Also starting with 0.6, Werkzeug will attempt to set the content
length if it is able to figure it out on its own. This is the
case if all the strings in the response iterable are already
encoded and the iterable is buffered.
:param environ: the WSGI environment of the request.
:return: returns a new :class:`Headers` object.
"""
headers = Headers(self.headers)
# make sure the location header is an absolute URL
location = headers.get('location')
if location is not None:
if isinstance(location, unicode):
location = iri_to_uri(location)
headers['Location'] = urlparse.urljoin(
get_current_url(environ, root_only=True),
location
)
# make sure the content location is a URL
content_location = headers.get('content-location')
if content_location is not None and \
isinstance(content_location, unicode):
headers['Content-Location'] = iri_to_uri(content_location)