Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _write_details(self, details):
"""Output details to the stream.
:param details: An extended details dict for a test outcome.
"""
self._stream.write(_b(" [ multipart\n"))
for name, content in sorted(details.items()):
self._stream.write(_b("Content-Type: %s/%s" %
(content.content_type.type, content.content_type.subtype)))
parameters = content.content_type.parameters
if parameters:
self._stream.write(_b(";"))
param_strs = []
for param, value in parameters.items():
param_strs.append("%s=%s" % (param, value))
self._stream.write(_b(",".join(param_strs)))
self._stream.write(_b("\n%s\n" % name))
encoder = chunked.Encoder(self._stream)
list(map(encoder.write, content.iter_bytes()))
encoder.close()
self._stream.write(_b("%s: " % outcome) + self._test_id(test))
if error_permitted:
if error is None and details is None:
raise ValueError
else:
if error is not None:
raise ValueError
if error is not None:
self._stream.write(self._start_simple)
tb_content = TracebackContent(error, test)
for bytes in tb_content.iter_bytes():
self._stream.write(bytes)
elif details is not None:
self._write_details(details)
else:
self._stream.write(_b("\n"))
if details is not None or error is not None:
self._stream.write(self._end_simple)
if details is None:
details = {}
details['traceback'] = TracebackContent(err, test)
if details is not None:
for name, content in details.items():
mime_type = repr(content.content_type)
file_bytes = None
for next_bytes in content.iter_bytes():
if file_bytes is not None:
self.status(
file_name=name, file_bytes=file_bytes,
mime_type=mime_type, test_id=test_id,
timestamp=now)
file_bytes = next_bytes
if file_bytes is None:
file_bytes = _b("")
self.status(
file_name=name, file_bytes=file_bytes, eof=True,
mime_type=mime_type, test_id=test_id, timestamp=now)
if reason is not None:
self.status(
file_name='reason', file_bytes=reason.encode('utf8'),
eof=True, mime_type="text/plain; charset=utf8",
test_id=test_id, timestamp=now)
self.status(
test_id=test_id, test_status=status,
test_tags=self.current_tags, timestamp=now)
def _iter_text(self):
"""Worker for iter_text - does the decoding."""
encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
decoder = codecs.getincrementaldecoder(encoding)()
for bytes in self.iter_bytes():
yield decoder.decode(bytes)
final = decoder.decode(_b(''), True)
if final:
yield final
def time(self, a_datetime):
"""Inform the client of the time.
":param datetime: A datetime.datetime object.
"""
time = a_datetime.astimezone(iso8601.Utc())
self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
time.year, time.month, time.day, time.hour, time.minute,
time.second, time.microsecond)))
from extras import try_import
from testtools.compat import (
_b,
_format_exception_only,
_format_stack_list,
_TB_HEADER,
_u,
str_is_unicode,
)
from testtools.content_type import ContentType, JSON, UTF8_TEXT
functools = try_import('functools')
_join_b = _b("").join
DEFAULT_CHUNK_SIZE = 4096
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
"""Read 'stream' in chunks of 'chunk_size'.
:param stream: A file-like object to read from.
:param chunk_size: The size of each read from 'stream'.
:param seek_offset: If non-None, seek before iterating.
:param seek_whence: Pass through to the seek call, if seeking.
"""
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Handlers for outcome details."""
from testtools import content, content_type
from testtools.compat import _b, BytesIO
from subunit import chunked
end_marker = _b("]\n")
quoted_marker = _b(" ]")
empty = _b('')
class DetailsParser(object):
"""Base class/API reference for details parsing."""
class SimpleDetailsParser(DetailsParser):
"""Parser for single-part [] delimited details."""
def __init__(self, state):
self._message = _b("")
self._state = state
def lineReceived(self, line):
if line == end_marker:
self._state.endDetails()
def flush(self, extra_len=0):
"""Flush the encoder to the output stream.
:param extra_len: Increase the size of the chunk by this many bytes
to allow for a subsequent write.
"""
if not self.buffer_size and not extra_len:
return
buffered_bytes = self.buffered_bytes
buffer_size = self.buffer_size
self.buffered_bytes = []
self.buffer_size = 0
self.output.write(_b("%X\r\n" % (buffer_size + extra_len)))
if buffer_size:
self.output.write(empty.join(buffered_bytes))
return True