Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_thread_filtering(LineMatcher, query):
lines = StringIO()
idents = set()
names = set()
started = threading.Event()
def record(event):
idents.add(event.threadid)
names.add(event.threadname)
return True
with hunter.trace(~Q(**query), record,
actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
threading_support=True):
def foo(a=1):
started.set()
print(a)
def main():
foo()
t = threading.Thread(target=foo)
t.start()
started.wait(10)
main()
lm = LineMatcher(lines.getvalue().splitlines())
print(lines.getvalue())
def test_threading_support(LineMatcher):
lines = StringIO()
idents = set()
names = set()
started = threading.Event()
def record(event):
idents.add(event.threadid)
names.add(event.threadname)
return True
with hunter.trace(record,
actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
threading_support=True):
def foo(a=1):
started.set()
print(a)
def main():
foo()
t = threading.Thread(target=foo)
t.start()
started.wait(10)
main()
lm = LineMatcher(lines.getvalue().splitlines())
assert idents - {t.ident} == {None}
def test_tracing_reinstall(LineMatcher):
lines = StringIO()
with hunter.trace(CodePrinter(stream=lines)):
def foo():
a = 2
sys.settrace(sys.gettrace())
a = 3
def bar():
a = 1
foo()
a = 4
bar()
print(lines.getvalue())
lm = LineMatcher(lines.getvalue().splitlines())
lm.fnmatch_lines([
"*test_hunter.py:* call def bar():",
"*test_hunter.py:* line a = 1",
def test_debugger(LineMatcher):
out = StringIO()
calls = []
class FakePDB:
def __init__(self, foobar=1):
calls.append(foobar)
def set_trace(self, frame):
calls.append(frame.f_code.co_name)
with hunter.trace(
lambda event: event.locals.get('node') == 'Foobar',
module='test_hunter',
function='foo',
actions=[CodePrinter,
VarsPrinter('a', 'node', 'foo', 'test_debugger', stream=out),
Debugger(klass=FakePDB, foobar=2)]
):
def foo():
a = 1
node = 'Foobar'
node += 'x'
a += 2
return a
foo()
print(out.getvalue())
def test_trace_merge():
with hunter.trace(function='a'):
with hunter.trace(function='b'):
with hunter.trace(function='c'):
assert sys.gettrace().handler == When(Q(function='c'), CallPrinter)
assert sys.gettrace().handler == When(Q(function='b'), CallPrinter)
assert sys.gettrace().handler == When(Q(function='a'), CallPrinter)
def test_tracing_bare(LineMatcher):
lines = StringIO()
with hunter.trace(CodePrinter(stream=lines)):
def a():
return 1
b = a()
b = 2
try:
raise Exception('BOOM!')
except Exception:
pass
print(lines.getvalue())
lm = LineMatcher(lines.getvalue().splitlines())
lm.fnmatch_lines([
"*test_hunter.py* call def a():",
"*test_hunter.py* line return 1",
"*test_hunter.py* return return 1",
"* ... return value: 1",
def test_tracer_autostop():
with hunter.trace(lambda: garbage) as tracer:
if os.environ.get("SETUPPY_CFLAGS") == "-DCYTHON_TRACE=1":
assert sys.gettrace() is not tracer
else:
assert sys.gettrace() is None
def __init__(self, *args, **kwargs):
self._calls = []
threading_support = kwargs.pop('threading_support', False)
clear_env_var = kwargs.pop('clear_env_var', False)
self.handler = hunter._prepare_predicate(*args, **kwargs)
self._tracer = hunter.trace(self._append, threading_support=threading_support, clear_env_var=clear_env_var)
def test_examples():
print("""
CodePrinter
""")
with hunter.trace(stdlib=False, actions=[CodePrinter, VarsSnooper]):
os.path.join(*map(str, range(10)))
print("""
CallPrinter
""")
with hunter.trace(stdlib=False, actions=[CallPrinter, VarsSnooper]):
os.path.join(*map(str, range(10)))
def test_dump_exceptions(LineMatcher):
stream = StringIO()
with hunter.trace(stdlib=False, action=DumpExceptions(stream=stream)):
silenced1()
silenced2()
silenced3()
silenced4()
print("Done silenced")
try:
notsilenced()
print("Done not silenced")
except ValueError:
pass
lm = LineMatcher(stream.getvalue().splitlines())
lm.fnmatch_lines([
'*>>>>>>>>>>>>>>>>>>>>>> tracing silenced1 on RuntimeError()',
'*test_cookbook.py:*** exception error()',
'* *** ... exception value: *RuntimeError*',