Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_threading_support(LineMatcher):
lines = StringIO()
idents = set()
names = set()
started = threading.Event()
def record(event):
idents.add(event.threadid)
names.add(event.threadname)
return True
with hunter.trace(record,
actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
threading_support=True):
def foo(a=1):
started.set()
print(a)
def main():
foo()
t = threading.Thread(target=foo)
t.start()
started.wait(10)
main()
lm = LineMatcher(lines.getvalue().splitlines())
assert idents - {t.ident} == {None}
assert 'MainThread' in names
def test_thread_filtering(LineMatcher, query):
lines = StringIO()
idents = set()
names = set()
started = threading.Event()
def record(event):
idents.add(event.threadid)
names.add(event.threadname)
return True
with hunter.trace(~Q(**query), record,
actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
threading_support=True):
def foo(a=1):
started.set()
print(a)
def main():
foo()
t = threading.Thread(target=foo)
t.start()
started.wait(10)
main()
lm = LineMatcher(lines.getvalue().splitlines())
print(lines.getvalue())
assert None not in idents
def test_tracing_vars(LineMatcher):
lines = StringIO()
with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
def a():
b = 1
b = 2
return 1
b = a()
b = 2
try:
raise Exception('BOOM!')
except Exception:
pass
print(lines.getvalue())
lm = LineMatcher(lines.getvalue().splitlines())
lm.fnmatch_lines([
"*test_hunter.py* call def a():",
"*test_hunter.py* line b = 1",
def test_tracing_vars_expressions(LineMatcher):
lines = StringIO()
with hunter.trace(actions=[VarsPrinter('Foo.bar', 'vars(Foo)', 'len(range(2))', 'Foo.__dict__["bar"]', stream=lines)]):
def main():
class Foo(object):
bar = 1
main()
print(lines.getvalue())
lm = LineMatcher(lines.getvalue().splitlines())
lm.fnmatch_lines_random([
'* [[]Foo.bar => 1[]]',
'* [[]vars(Foo) => *[]]',
'* [[]len(range(2)) => 2[]]',
'* [[]Foo.__dict__[[]"bar"[]] => 1[]]',
])
with trace(function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(function='foobar'), Debugger))
# pdb.set_trace on any hits
with trace(module='foo', function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(module='foo', function='foobar'), Debugger))
# pdb.set_trace when function is foobar, otherwise just print when module is foo
with trace(Q(function='foobar', action=Debugger), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), Debugger),
Q(module='foo')
), CallPrinter))
# dumping variables from stack
with trace(Q(function='foobar', action=VarsPrinter('foobar')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar')),
Q(module='foo'),
), CallPrinter))
with trace(Q(function='foobar', action=VarsPrinter('foobar', 'mumbojumbo')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar', 'mumbojumbo')),
Q(module='foo'),
), CallPrinter))
# multiple actions
with trace(Q(function='foobar', actions=[VarsPrinter('foobar'), Debugger]), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar'), Debugger),
Q(module='foo'),
# dumping variables from stack
with trace(Q(function='foobar', action=VarsPrinter('foobar')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar')),
Q(module='foo'),
), CallPrinter))
with trace(Q(function='foobar', action=VarsPrinter('foobar', 'mumbojumbo')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar', 'mumbojumbo')),
Q(module='foo'),
), CallPrinter))
# multiple actions
with trace(Q(function='foobar', actions=[VarsPrinter('foobar'), Debugger]), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar'), Debugger),
Q(module='foo'),
), CallPrinter))
def test_probe(impl, benchmark):
with impl('%s.baz' % __name__, hunter.VarsPrinter('foo', stream=open(os.devnull, 'w')), kind="return", depth=0):
benchmark(bar)
def run():
output = StringIO()
with t.trace(Q(
~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind='call'),
actions=[
CodePrinter(
stream=output
),
VarsPrinter(
'line',
stream=output
)
]
)):
_bulky_func_that_use_stdlib()
def test_tracing_printing_failures(LineMatcher):
lines = StringIO()
with trace(actions=[CodePrinter(stream=lines, repr_func=repr), VarsPrinter('x', stream=lines, repr_func=repr)]):
class Bad(object):
__slots__ = []
def __repr__(self):
raise RuntimeError("I'm a bad class!")
def a():
x = Bad()
return x
def b():
x = Bad()
raise Exception(x)
a()
try:
out = StringIO()
calls = []
class FakePDB:
def __init__(self, foobar=1):
calls.append(foobar)
def set_trace(self, frame):
calls.append(frame.f_code.co_name)
with hunter.trace(
lambda event: event.locals.get('node') == 'Foobar',
module='test_hunter',
function='foo',
actions=[CodePrinter,
VarsPrinter('a', 'node', 'foo', 'test_debugger', stream=out),
Debugger(klass=FakePDB, foobar=2)]
):
def foo():
a = 1
node = 'Foobar'
node += 'x'
a += 2
return a
foo()
print(out.getvalue())
assert calls == [2, 'foo']
lm = LineMatcher(out.getvalue().splitlines())
pprint(lm.lines)
lm.fnmatch_lines_random([
"* [[]test_debugger =>