Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_predicate_when(mockevent):
called = []
assert When(Q(module='foo'), lambda ev: called.append(ev))(mockevent) == False
assert called == []
assert When(Q(module=__name__), lambda ev: called.append(ev))(mockevent) == True
assert called == [mockevent]
called = []
assert Q(module=__name__, action=lambda ev: called.append(ev))(mockevent) == True
assert called == [mockevent]
called = [[], []]
predicate = (
Q(module=__name__, action=lambda ev: called[0].append(ev)) |
Q(module='foo', action=lambda ev: called[1].append(ev))
)
assert predicate(mockevent) == True
assert called == [[mockevent], []]
assert predicate(mockevent) == True
assert called == [[mockevent, mockevent], []]
def test_predicate_q_expansion():
assert Q(C(1), C(2), module=3) == And(C(1), C(2), Q(module=3))
assert Q(C(1), C(2), module=3, action=C(4)) == When(And(C(1), C(2), Q(module=3)), C(4))
assert Q(C(1), C(2), module=3, actions=[C(4), C(5)]) == When(And(C(1), C(2), Q(module=3)), C(4), C(5))
# 'or' by expression
with trace(module='foo', function='foobar') as t:
assert t.handler == When(Q(module='foo', function='foobar'), CallPrinter)
# pdb.set_trace
with trace(function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(function='foobar'), Debugger))
# pdb.set_trace on any hits
with trace(module='foo', function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(module='foo', function='foobar'), Debugger))
# pdb.set_trace when function is foobar, otherwise just print when module is foo
with trace(Q(function='foobar', action=Debugger), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), Debugger),
Q(module='foo')
), CallPrinter))
# dumping variables from stack
with trace(Q(function='foobar', action=VarsPrinter('foobar')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar')),
Q(module='foo'),
), CallPrinter))
with trace(Q(function='foobar', action=VarsPrinter('foobar', 'mumbojumbo')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar', 'mumbojumbo')),
Q(module='foo'),
), CallPrinter))
# pdb.set_trace on any hits
with trace(module='foo', function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(module='foo', function='foobar'), Debugger))
# pdb.set_trace when function is foobar, otherwise just print when module is foo
with trace(Q(function='foobar', action=Debugger), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), Debugger),
Q(module='foo')
), CallPrinter))
# dumping variables from stack
with trace(Q(function='foobar', action=VarsPrinter('foobar')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar')),
Q(module='foo'),
), CallPrinter))
with trace(Q(function='foobar', action=VarsPrinter('foobar', 'mumbojumbo')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar', 'mumbojumbo')),
Q(module='foo'),
), CallPrinter))
# multiple actions
with trace(Q(function='foobar', actions=[VarsPrinter('foobar'), Debugger]), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar'), Debugger),
Q(module='foo'),
), CallPrinter))
# 'or' by expression
with trace(module='foo', function='foobar') as t:
assert t.handler == When(Q(module='foo', function='foobar'), CallPrinter)
# pdb.set_trace
with trace(function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(function='foobar'), Debugger))
# pdb.set_trace on any hits
with trace(module='foo', function='foobar', action=Debugger) as t:
assert str(t.handler) == str(When(Q(module='foo', function='foobar'), Debugger))
# pdb.set_trace when function is foobar, otherwise just print when module is foo
with trace(Q(function='foobar', action=Debugger), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), Debugger),
Q(module='foo')
), CallPrinter))
# dumping variables from stack
with trace(Q(function='foobar', action=VarsPrinter('foobar')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar')),
Q(module='foo'),
), CallPrinter))
with trace(Q(function='foobar', action=VarsPrinter('foobar', 'mumbojumbo')), module='foo') as t:
assert str(t.handler) == str(When(And(
When(Q(function='foobar'), VarsPrinter('foobar', 'mumbojumbo')),
Q(module='foo'),
), CallPrinter))
if I.expires and now > I.expires:
continue
if I.check_call(self, *args, **kwargs):
instruments_by_event[I.event].append(I)
if I.event in ("call", "return"):
if "hotspots" in I.value or "hotspots" in (
I.custom.get("tags") or ""
):
hotspots.enabled = True
target_obj, target_func_name = self.target.rsplit(".", 1)
is_unwrapped = base.func_code.co_name == target_func_name
if instruments_by_event["end"]:
# We have instruments that require evaluation in the local
# context of the function. Call sys.settrace() to gain access.
predicate = hunter.When(
hunter.Query(
# Only trace returns (this will include exceptions!)...
kind="return",
# ...and only in the given function...
function=target_func_name,
# ...(no deeper).
depth=0,
)
if is_unwrapped
else hunter.Query(
# Only trace returns (this will include exceptions!)...
kind="return",
# ...and only in the given function...
function=target_func_name,
# ...but we don't know how many times it's been wrapped.
# Use the module instead as an approximate match.
),
TraceHandler(self, instruments_by_event["end"]),
)
tracer = hunter.Tracer(
# There's no need to call threading.settrace() because
# a) we're targeting a function we're about to call
# in the same thread,
# b) we're going to undo it immediately after, and
# c) it would collide with other threads if they did
# the same concurrently.
threading_support=False
).trace(predicate)
elif hotspots.enabled:
# We have instruments that require timing internal lines.
# Call sys.settrace() to gain access.
predicate = hunter.When(
hunter.Query(
# Only trace lines...
kind="line",
# ...and only in the given function...
function=target_func_name,
# ...(no deeper).
depth=1,
)
if is_unwrapped
else hunter.Query(
# Only trace lines...
kind="line",
# ...and only in the given function...
function=target_func_name,
# ...but we don't know how many times it's been wrapped.
# Use the module instead as an approximate match.