Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
# We add handlers that store root- and base-logger output
self.rootBuffer = StringIO()
rootLogger = logging.getLogger()
self.prevRootLogLevel = rootLogger.getEffectiveLevel()
self.rootLogHandler = logging.StreamHandler(self.rootBuffer)
rootLogger.addHandler(self.rootLogHandler)
self.baseBuffer = StringIO()
baseLogger = logging.getLogger(BASE_LOGGER_NAME)
self.prevBaseLogLevel = baseLogger.getEffectiveLevel()
self.baseLogHandler = logging.StreamHandler(self.baseBuffer)
baseLogger.addHandler(self.baseLogHandler)
def setUp(self):
# We add handlers that store root- and base-logger output
self.rootBuffer = StringIO()
rootLogger = logging.getLogger()
self.prevRootLogLevel = rootLogger.getEffectiveLevel()
self.rootLogHandler = logging.StreamHandler(self.rootBuffer)
rootLogger.addHandler(self.rootLogHandler)
self.baseBuffer = StringIO()
baseLogger = logging.getLogger(BASE_LOGGER_NAME)
self.prevBaseLogLevel = baseLogger.getEffectiveLevel()
self.baseLogHandler = logging.StreamHandler(self.baseBuffer)
baseLogger.addHandler(self.baseLogHandler)
# We have there environment, now invoke the application
_logger.debug("runWSGIApp application()...")
result = application(env, self.wsgiStartResponse)
try:
for data in result:
if data:
self.wsgiWriteData(data)
else:
_logger.debug("runWSGIApp empty data")
finally:
_logger.debug("runWSGIApp finally.")
if hasattr(result, "close"):
result.close()
except Exception:
_logger.debug("runWSGIApp caught exception...")
errorMsg = compat.StringIO()
traceback.print_exc(file=errorMsg)
logging.error(errorMsg.getvalue())
if not self.wsgiSentHeaders:
self.wsgiStartResponse(
"500 Server Error", [("Content-type", "text/html")]
)
self.wsgiWriteData(SERVER_ERROR)
if not self.wsgiSentHeaders:
# GC issue 29 sending one byte, when content-length is '0' seems wrong
# We must write out something!
# self.wsgiWriteData (" ")
self.wsgiWriteData(b"")
return
def get_content(self):
return compat.StringIO(self.content)
def get_content(self):
"""Open content as a stream for reading.
See DAVResource.get_content()
"""
filestream = compat.StringIO()
tableName, primKey = self.provider._split_path(self.path)
if primKey is not None:
conn = self.provider._init_connection()
listFields = self.provider._get_field_list(conn, tableName)
csvwriter = csv.DictWriter(filestream, listFields, extrasaction="ignore")
dictFields = {}
for field_name in listFields:
dictFields[field_name] = field_name
csvwriter.writerow(dictFields)
if primKey == "_ENTIRE_CONTENTS":
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SELECT * from " + self.provider._db + "." + tableName)
result_set = cursor.fetchall()
for row in result_set:
def get_content(self):
"""Open content as a stream for reading.
See DAVResource.get_content()
"""
assert not self.is_collection
d = self.fctx.data()
return compat.StringIO(d)
def get_content(self):
html = "<pre>" + pformat(self.doc) + "</pre>"
return compat.StringIO(html.encode("utf8"))
def element_content_as_string(element):
"""Serialize etree.Element.
Note: element may contain more than one child or only text (i.e. no child
at all). Therefore the resulting string may raise an exception, when
passed back to etree.XML().
"""
if len(element) == 0:
return element.text or "" # Make sure, None is returned as ''
stream = compat.StringIO()
for childnode in element:
stream.write(xml_to_bytes(childnode, pretty_print=False) + "\n")
# print(xml_to_bytes(childnode, pretty_print=False), file=stream)
s = stream.getvalue()
stream.close()
return s