Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _add_reason(self, reason):
self.addDetail('reason', content.Content(
content.ContentType('text', 'plain'),
lambda: [reason.encode('utf8')]))
def text_content(text):
"""Create a Content object from some text.
This is useful for adding details which are short strings.
"""
if not istext(text):
raise TypeError(
"text_content must be given text, not '%s'." % type(text).__name__
)
return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
def _setUp(self):
self.logs = []
add_destination(self.logs.append)
self.addCleanup(lambda: remove_destination(self.logs.append))
self.addDetail(
self.LOG_DETAIL_NAME,
Content(
UTF8_TEXT,
lambda: [_eliottree(self.logs)],
),
if split_logs[0] is None:
split_logs[0] = _split_map_maybe(
_get_eliot_data, _iter_content_lines(twisted_log),
)
return split_logs[0]
# The trick here is that we can't iterate over the base detail yet.
# We can only use it inside the iter_bytes of the Content objects
# that we add. This is because the only time that we *know* the
# details are populated is when the details are evaluated. If we call
# it in _setUp(), the logs are empty. If we call it in cleanup, the
# detail is gone.
detailed.addDetail(
detail_name,
Content(UTF8_TEXT, lambda: _get_split_logs()[0]))
detailed.addDetail(
self._ELIOT_LOG_DETAIL_NAME,
Content(
UTF8_TEXT, lambda: _prettyformat_lines(_get_split_logs()[1])))
content_type = ContentType('text', 'x-traceback',
{"language": "python", "charset": "utf8"})
value = prefix_content + \
self._stack_lines_to_unicode(stack_lines) + \
postfix_content
super(StackLinesContent, self).__init__(
content_type, lambda: [value.encode("utf8")])
def _stack_lines_to_unicode(self, stack_lines):
"""Converts a list of pre-processed stack lines into a unicode string.
"""
msg_lines = traceback.format_list(stack_lines)
return _u('').join(msg_lines)
class TracebackContent(Content):
"""Content object for tracebacks.
This adapts an exc_info tuple to the 'Content' interface.
'text/x-traceback;language=python' is used for the mime type, in order to
provide room for other languages to format their tracebacks differently.
"""
def __init__(self, err, test, capture_locals=False):
"""Create a TracebackContent for ``err``.
:param err: An exc_info error tuple.
:param test: A test object used to obtain failureException.
:param capture_locals: If true, show locals in the traceback.
"""
if err is None:
raise ValueError("err may not be None")
def _add_reason(self, reason):
self.addDetail('reason', content.Content(
content.ContentType('text', 'plain'),
lambda: [reason.encode('utf8')]))
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
super(Starts, self).status(
test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
if not test_id:
if not file_bytes:
return
if not mime_type or mime_type == 'test/plain;charset=utf8':
mime_type = 'text/plain; charset=utf-8'
primary, sub, parameters = mimeparse.parse_mime_type(mime_type)
content_type = testtools.content_type.ContentType(
primary, sub, parameters)
content = testtools.content.Content(
content_type, lambda: [file_bytes])
text = content.as_text()
if text and text[-1] not in '\r\n':
self._neednewline = True
self._output.write(text)
elif test_status == 'inprogress' and test_id not in self._emitted:
if self._neednewline:
self._neednewline = False
self._output.write('\n')
worker = ''
for tag in test_tags or ():
if tag.startswith('worker-'):
worker = '(' + tag[7:] + ') '
if timestamp:
timestr = timestamp.isoformat()
else:
def _feed_chunks(self, line):
residue = self._chunk_parser.write(line)
if residue is not None:
# Line based use always ends on no residue.
assert residue == empty, 'residue: %r' % (residue,)
body = self._body
self._details[self._name] = content.Content(
self._content_type, lambda:[body.getvalue()])
self._chunk_parser.close()
self._parse_state = self._look_for_content
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
super(Starts, self).status(
test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
if not test_id:
if not file_bytes:
return
if not mime_type or mime_type == 'test/plain;charset=utf8':
mime_type = 'text/plain; charset=utf-8'
primary, sub, parameters = mimeparse.parse_mime_type(mime_type)
content_type = testtools.content_type.ContentType(
primary, sub, parameters)
content = testtools.content.Content(
content_type, lambda: [file_bytes])
text = content.as_text()
if text and text[-1] not in '\r\n':
self._neednewline = True
self._output.write(text)
elif test_status == 'inprogress' and test_id not in self._emitted:
if self._neednewline:
self._neednewline = False
self._output.write('\n')
worker = ''
for tag in test_tags or ():
if tag.startswith('worker-'):
worker = '(' + tag[7:] + ') '
if timestamp:
timestr = timestamp.isoformat()
else:
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
super(Starts, self).status(
test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
if not test_id:
if not file_bytes:
return
if not mime_type or mime_type == 'test/plain;charset=utf8':
mime_type = 'text/plain; charset=utf-8'
primary, sub, parameters = mimeparse.parse_mime_type(mime_type)
content_type = testtools.content_type.ContentType(
primary, sub, parameters)
content = testtools.content.Content(
content_type, lambda: [file_bytes])
text = content.as_text()
if text and text[-1] not in '\r\n':
self._neednewline = True
self._output.write(text)
elif test_status == 'inprogress' and test_id not in self._emitted:
if self._neednewline:
self._neednewline = False
self._output.write('\n')
worker = ''
for tag in test_tags or ():
if tag.startswith('worker-'):
worker = '(' + tag[7:] + ') '
if timestamp:
timestr = timestamp.isoformat()
else: