Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(*, backtrace, colorize, diagnose):
logger.remove()
logger.add(sys.stderr, format="", colorize=colorize, backtrace=backtrace, diagnose=diagnose)
try:
divide_indirect(10, 0)
except ZeroDivisionError:
logger.exception("")
def c():
1, 2.5, 3.0, 0.4, "str", r"rrr", rb"binary", b()
def d():
min(range(1, 10)), list(), dict(), c(), ...
def e(x):
x in [1], x in (1,), x in {1}, x in {1: 1}, d()
try:
e(0)
except ZeroDivisionError:
logger.exception("")
def c():
1 / 0
a_decorated()
with logger.catch():
a_not_decorated()
try:
a_not_decorated()
except ZeroDivisionError:
logger.exception("")
def test_has_sys_real_prefix(writer, monkeypatch):
monkeypatch.setattr(sys, "real_prefix", "/foo/bar/baz", raising=False)
logger.add(writer, backtrace=False, diagnose=True, colorize=False, format="")
try:
1 / 0
except ZeroDivisionError:
logger.exception("")
assert writer.read().endswith("ZeroDivisionError: division by zero\n")
import sys
from loguru import logger
logger.remove()
logger.add(sys.stderr, format="", colorize=True, backtrace=False, diagnose=True)
class Object:
def __repr__(self):
raise RuntimeError("No way!")
try:
obj = Object()
obj + 1 / 0
except ZeroDivisionError:
logger.exception("")
logger.add(sys.stderr, format="", colorize=False, backtrace=False, diagnose=False)
logger.add(sys.stderr, format="", colorize=False, backtrace=True, diagnose=False)
def foo():
bar()
@logger.catch(NotImplementedError)
def bar():
1 / 0
try:
foo()
except ZeroDivisionError:
logger.exception("")
def get_file(self, file_id, teamdrive_id=None, stream=True, headers=None, timeout=30):
try:
return self.drives['drive_root'].get_file(file_id, teamdrive_id=teamdrive_id, stream=stream,
headers=headers, timeout=timeout)
except Exception:
logger.exception(f"Exception getting file {file_id!r} - teamdrive_id {teamdrive_id}: ")
logger.info("hmmm")
return None
Returns:
A (unix-style) return code (0=normal, anything else is an error).
"""
retcode = ReturnCode.SUCCESS
try:
func(*args, **kwargs)
except KeyboardInterrupt:
logger.error("Interrupted")
retcode = ReturnCode.TERMINATED
except IOError as err:
if err.errno == errno.EPIPE:
retcode = ReturnCode.ERROR
else:
raise
except (AtroposError, EOFError):
logger.exception("Atropos error")
retcode = ReturnCode.ERROR
except: # pylint: disable=broad-except
logger.exception("Unknown error")
retcode = ReturnCode.ERROR
return retcode
async def cmd_rehash(context: Context):
""" rehash the hash browns. (reloads config file)"""
logger.warning(f"config rehashing invoked by user {context.user.nickname}")
path = cli_manager.GET_ARGUMENTS().config_file
await context.reply(f"reloading configuration...")
try:
_, resulting_hash = setup(path)
except (KeyError, ValueError) as exc:
# emit stacktrace to logfile
logger.exception("failed to rehash configuration.")
# if you have access to mecha's configuration file, you have access to its logs.
# no need to defer this to the top-level exception handler.
await context.reply(f"unable to rehash configuration file see logfile for details.")
else:
# no errors, respond status OK with the first octet of the hash.
await context.reply(f"rehashing completed successfully. ({resulting_hash[:8]}) ")
logger.info(f"Fetching extra results from page {pages}")
if 'params' in kwargs:
kwargs['params'].update({'pageToken': resp_json['nextPageToken']})
elif 'json' in kwargs:
kwargs['json'].update({'pageToken': resp_json['nextPageToken']})
elif 'data' in kwargs:
kwargs['data'].update({'pageToken': resp_json['nextPageToken']})
continue
break
return True if resp_json and len(resp_json) else False, resp, resp_json if (
resp_json and len(resp_json)) else resp.text
except Exception:
logger.exception(f"Exception sending request to {request_url} with kwargs={kwargs}: ")
return False, resp, None