Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
child_threads = list(self._child_threads.items())
for thread, thread_info in child_threads:
thread_info["time"] -= timestep
if thread_info["time"] <= 0 and thread.is_alive():
self._children_not_running.clear()
thread_info["event"].set() # Wake it up
waiting_on.append(thread)
# Wait for everything we woke up to sleep again.
# Check that we did wake something up.
if waiting_on:
i = 0
while not self._children_not_running.wait(0.020):
i += 1
if i == self._freeze_detect_threshold:
raise TestFroze("Waiting on %s" % waiting_on)
# if this timed out, check to see if that particular
# child died...
for thread in waiting_on:
if thread.is_alive():
break # out of the for loop, at least one child is active
else:
break # out of the while loop, everyone is dead
def test_faketime_infinite_loop_thread():
"""Test that infinite loops are detected"""
wpilib.DriverStation._reset()
ft = FakeTime()
ft._freeze_detect_threshold = 5
ft.initialize()
it = InfiniteLoopThread(ft)
it.start()
with pytest.raises(TestFroze):
ft.increment_new_packet()
ft.increment_new_packet()
with it.cond:
it.cond.notify_all()