Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def write_header(self, line, nl):
line = line.decode(self.charset or 'latin1')
if not nl: raise MultipartError('Unexpected end of line in header.')
if not line.strip(): # blank line -> end of header segment
self.finish_header()
elif line[0] in ' \t' and self.headerlist:
name, value = self.headerlist.pop()
self.headerlist.append((name, value+line.strip()))
else:
if ':' not in line:
raise MultipartError("Syntax error in header: No colon.")
name, value = line.split(':', 1)
self.headerlist.append((name.strip(), value.strip()))
def write_header(self, line, nl):
line = line.decode(self.charset or 'latin1')
if not nl: raise MultipartError('Unexpected end of line in header.')
if not line.strip(): # blank line -> end of header segment
self.finish_header()
elif line[0] in ' \t' and self.headerlist:
name, value = self.headerlist.pop()
self.headerlist.append((name, value+line.strip()))
else:
if ':' not in line:
raise MultipartError("Syntax error in header: No colon.")
name, value = line.split(':', 1)
self.headerlist.append((name.strip(), value.strip()))
def write_body(self, line, nl):
if not line and not nl: return # This does not even flush the buffer
self.size += len(line) + len(self._buf)
self.file.write(self._buf + line)
self._buf = nl
if self.content_length > 0 and self.size > self.content_length:
raise MultipartError('Size of body exceeds Content-Length header.')
if self.size > self.memfile_limit and isinstance(self.file, BytesIO):
# TODO: What about non-file uploads that exceed the memfile_limit?
self.file, old = TemporaryFile(mode='w+b'), self.file
old.seek(0)
copy_file(old, self.file, self.size, self.buffer_size)
''' Parse a multipart/form-data byte stream. This object is an iterator
over the parts of the message.
:param stream: A file-like stream. Must implement ``.read(size)``.
:param boundary: The multipart boundary as a byte string.
:param content_length: The maximum number of bytes to read.
'''
self.stream, self.boundary = stream, boundary
self.content_length = content_length
self.disk_limit = disk_limit
self.memfile_limit = memfile_limit
self.mem_limit = min(mem_limit, self.disk_limit)
self.buffer_size = min(buffer_size, self.mem_limit)
self.charset = charset
if self.buffer_size - 6 < len(boundary): # "--boundary--\r\n"
raise MultipartError('Boundary does not fit into buffer_size.')
self._done = []
self._part_iter = None
mem_used += part.size
else:
disk_used += part.size
part.file.seek(0)
yield part
part = MultipartPart(**opts)
else:
is_tail = not nl # The next line continues this one
part.feed(line, nl)
if part.is_buffered():
if part.size + mem_used > self.mem_limit:
raise MultipartError("Memory limit reached.")
elif part.size + disk_used > self.disk_limit:
raise MultipartError("Disk limit reached.")
if line != terminator:
raise MultipartError("Unexpected end of multipart stream.")
(text_type) and lists as values (multiple values per key are possible).
The forms-dictionary contains form-field values as text_type strings.
The files-dictionary contains :class:`MultipartPart` instances, either
because the form-field was a file-upload or the value is to big to fit
into memory limits.
:param environ: An WSGI environment dict.
:param charset: The charset to use if unsure. (default: utf8)
:param strict: If True, raise :exc:`MultipartError` on any parsing
errors. These are silently ignored by default.
'''
forms, files = MultiDict(), MultiDict()
try:
if environ.get('REQUEST_METHOD', 'GET').upper() not in ('POST', 'PUT'):
raise MultipartError("Request method other than POST or PUT.")
content_length = int(environ.get('CONTENT_LENGTH', '-1'))
content_type = environ.get('CONTENT_TYPE', '')
if not content_type:
raise MultipartError("Missing Content-Type header.")
content_type, options = parse_options_header(content_type)
stream = environ.get('wsgi.input') or BytesIO()
kw['charset'] = charset = options.get('charset', charset)
if content_type == 'multipart/form-data':
boundary = options.get('boundary', '')
if not boundary:
raise MultipartError("No boundary for multipart/form-data.")
for part in MultipartParser(stream, boundary, content_length, **kw):
if part.filename or not part.is_buffered():
files[part.name] = part
else: # TODO: Big form-fields are in the files dict. really?
forms[part.name] = part.value
def finish_header(self):
self.file = BytesIO()
self.headers = Headers(self.headerlist)
cdis = self.headers.get('Content-Disposition','')
ctype = self.headers.get('Content-Type','')
clen = self.headers.get('Content-Length','-1')
if not cdis:
raise MultipartError('Content-Disposition header is missing.')
self.disposition, self.options = parse_options_header(cdis)
self.name = self.options.get('name')
self.filename = self.options.get('filename')
self.content_type, options = parse_options_header(ctype)
self.charset = options.get('charset') or self.charset
self.content_length = int(self.headers.get('Content-Length','-1'))
elif line == separator and not is_tail:
if part.is_buffered(): mem_used += part.size
else: disk_used += part.size
part.file.seek(0)
yield part
part = MultipartPart(**opts)
else:
is_tail = not nl # The next line continues this one
part.feed(line, nl)
if part.is_buffered():
if part.size + mem_used > self.mem_limit:
raise MultipartError("Memory limit reached.")
elif part.size + disk_used > self.disk_limit:
raise MultipartError("Disk limit reached.")
if line != terminator:
raise MultipartError("Unexpected end of multipart stream.")
def finish_header(self):
self.file = BytesIO()
self.headers = Headers(self.headerlist)
cdis = self.headers.get('Content-Disposition', '')
ctype = self.headers.get('Content-Type', '')
self.headers.get('Content-Length', '-1')
if not cdis:
raise MultipartError('Content-Disposition header is missing.')
self.disposition, self.options = parse_options_header(cdis)
self.name = self.options.get('name')
self.filename = self.options.get('filename')
self.content_type, options = parse_options_header(ctype)
self.charset = options.get('charset') or self.charset
self.content_length = int(self.headers.get('Content-Length', '-1'))
def write_body(self, line, nl):
if not line and not nl:
return # This does not even flush the buffer
self.size += len(line) + len(self._buf)
self.file.write(self._buf + line)
self._buf = nl
if self.content_length > 0 and self.size > self.content_length:
raise MultipartError('Size of body exceeds Content-Length header.')
if self.size > self.memfile_limit and isinstance(self.file, BytesIO):
# TODO: What about non-file uploads that exceed the memfile_limit?
self.file, old = TemporaryFile(mode='w+b'), self.file
old.seek(0)
copy_file(old, self.file, self.size, self.buffer_size)