Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def encode_multipart(self, params, files):
"""
Encodes a set of parameters (typically a name/value list) and
a set of files (a list of (name, filename, file_body, mimetype)) into a
typical POST body, returning the (content_type, body).
"""
boundary = to_bytes(str(random.random()))[2:]
boundary = b'----------a_BoUnDaRy' + boundary + b'$'
lines = []
def _append_file(file_info):
key, filename, value, fcontent = self._get_file_info(file_info)
if isinstance(key, text_type):
try:
key = key.encode('ascii')
except: # pragma: no cover
raise # file name must be ascii
if isinstance(filename, text_type):
try:
filename = filename.encode('utf8')
except: # pragma: no cover
raise # file name must be ascii or utf8
if not fcontent:
def test_multiple_file_uploads_with_filename_and_contents(self):
uploaded_file1_name = os.path.join(os.path.dirname(__file__),
"__init__.py")
uploaded_file1_contents = open(uploaded_file1_name).read()
if PY3:
uploaded_file1_contents = to_bytes(uploaded_file1_contents)
uploaded_file2_name = __file__
uploaded_file2_name = os.path.join(os.path.dirname(__file__), 'html',
"404.html")
uploaded_file2_contents = open(uploaded_file2_name).read()
if PY3:
uploaded_file2_contents = to_bytes(uploaded_file2_contents)
app = webtest.TestApp(MultipleUploadFileApp())
res = app.get('/')
self.assertEqual(res.status_int, 200)
self.assertEqual(res.headers['content-type'],
'text/html; charset=utf-8')
self.assertEqual(res.content_type, 'text/html')
single_form = res.forms["file_upload_form"]
single_form.set("file-field-1",
if isinstance(params, dict) or hasattr(params, 'items'):
params = list(params.items())
if isinstance(params, (list, tuple)):
inline_uploads = [v for (k, v) in params
if isinstance(v, (forms.File, forms.Upload))]
if len(inline_uploads) > 0:
content_type, params = self.encode_multipart(
params, upload_files or ())
environ['CONTENT_TYPE'] = content_type
else:
params = utils.encode_params(params, content_type)
if upload_files or \
(content_type and
to_bytes(content_type).startswith(b'multipart')):
params = urlparse.parse_qsl(params, keep_blank_values=True)
content_type, params = self.encode_multipart(
params, upload_files or ())
environ['CONTENT_TYPE'] = content_type
elif params:
environ.setdefault('CONTENT_TYPE',
str('application/x-www-form-urlencoded'))
if content_type is not None:
environ['CONTENT_TYPE'] = content_type
environ['REQUEST_METHOD'] = str(method)
url = str(url)
url = self._remove_fragment(url)
req = self.RequestClass.blank(url, environ)
if isinstance(params, text_type):
params = params.encode(req.charset or 'utf8')
<option value="9">Nine</option>
<option value="10">Ten</option>
<option value="11">Eleven</option>
<input value="multiple" type="submit" name="button">
""")
else:
select_type = req.POST.get("button")
if select_type == "single":
selection = req.POST.get("single")
elif select_type == "multiple":
selection = ", ".join(req.POST.getall("multiple"))
body = to_bytes("""
<title>display page</title>
<p>You submitted the %(select_type)s </p>
<p>You selected %(selection)s</p>
""" % dict(selection=selection, select_type=select_type))
headers = [
('Content-Type', 'text/html; charset=utf-8'),
('Content-Length', str(len(body)))]
# PEP 3333 requires native strings:
headers = [(str(k), str(v)) for k, v in headers]
start_response(status, headers)
return [body]
def select_app_without_values(environ, start_response):
req = Request(environ)
status = b"200 OK"
if req.method == "GET":
body = to_bytes("""
<title>form page</title>
<form id="single_select_form" method="POST">
<select name="single" id="single">
<option>Four</option>
<option>Five</option>
<option>Six</option>
<option>Seven</option>
</select>
<input value="single" type="submit" name="button">
</form>
<form id="multiple_select_form" method="POST">
<select multiple="multiple" name="multiple" id="multiple">
<option>Eight</option>
<option value="Nine" selected="">Nine</option></select></form>
def cookie_app(environ, start_response):
status = to_bytes("200 OK")
body = 'Cookie.'
headers = [
('Content-Type', 'text/plain'),
('Content-Length', str(len(body))),
('Set-Cookie',
'spam=eggs; secure; Domain=.example.org;'),
]
start_response(status, headers)
return [to_bytes(body)]
for p in args['params']]
if 'upload_files' in args:
args['upload_files'] = [map(to_str, f)
for f in args['upload_files']]
if 'content_type' in args:
args['content_type'] = to_str(args['content_type'])
if method == 'get':
method = self.test_app.get
else:
method = self.test_app.post
return method(href, **args)
_normal_body_regex = re.compile(to_bytes(r'[ \n\r\t]+'))
@property
def normal_body(self):
"""
Return the whitespace-normalized body
"""
if getattr(self, '_normal_body', None) is None:
self._normal_body = self._normal_body_regex.sub(b' ', self.body)
return self._normal_body
_unicode_normal_body_regex = re.compile('[ \\n\\r\\t]+')
@property
def unicode_normal_body(self):
"""
Return the whitespace-normalized body, as unicode
def get_files_page(self, req):
file_parts = []
uploaded_files = [(k, v) for k, v in req.POST.items() if 'file' in k]
uploaded_files = sorted(uploaded_files)
for name, uploaded_file in uploaded_files:
if isinstance(uploaded_file, cgi.FieldStorage):
filename = to_bytes(uploaded_file.filename)
value = to_bytes(uploaded_file.value, 'ascii')
content_type = to_bytes(uploaded_file.type, 'ascii')
else:
filename = value = content_type = b''
file_parts.append(b"""
<p>You selected '""" + filename + b"""'</p>
<p>with contents: '""" + value + b"""'</p>
<p>with content type: '""" + content_type + b"""'</p>
""")
return b''.join(file_parts)
def get_files_page(self, req):
file_parts = []
uploaded_files = [(k, v) for k, v in req.POST.items() if 'file' in k]
uploaded_files = sorted(uploaded_files)
for name, uploaded_file in uploaded_files:
if isinstance(uploaded_file, cgi.FieldStorage):
filename = to_bytes(uploaded_file.filename)
value = to_bytes(uploaded_file.value, 'ascii')
content_type = to_bytes(uploaded_file.type, 'ascii')
else:
filename = value = content_type = b''
file_parts.append(b"""
<p>You selected '""" + filename + b"""'</p>
<p>with contents: '""" + value + b"""'</p>
<p>with content type: '""" + content_type + b"""'</p>
""")
return b''.join(file_parts)
def set_authorization(self, value):
self.authorization_value = value
if value is not None:
invalid_value = (
"You should use a value like ('Basic', ('user', 'password'))"
" OR ('Bearer', 'token') OR ('JWT', 'token')"
)
if isinstance(value, (list, tuple)) and len(value) == 2:
authtype, val = value
if authtype == 'Basic' and val and \
isinstance(val, (list, tuple)):
val = ':'.join(list(val))
val = b64encode(to_bytes(val)).strip()
val = val.decode('latin1')
elif authtype in ('Bearer', 'JWT') and val and \
isinstance(val, (str, text_type)):
val = val.strip()
else:
raise ValueError(invalid_value)
value = str('%s %s' % (authtype, val))
else:
raise ValueError(invalid_value)
self.extra_environ.update({
'HTTP_AUTHORIZATION': value,
})
else:
if 'HTTP_AUTHORIZATION' in self.extra_environ:
del self.extra_environ['HTTP_AUTHORIZATION']