Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_faketime_3():
"""Test calling the step function with varying lengths"""
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
ft.set_time_limit(5)
sc = StepChecker()
ft.ds_cond._on_step = sc.on_step
ft.increment_time_by(0.005)
ft.increment_time_by(0.01)
ft.increment_time_by(0.02)
ft.increment_time_by(0.03)
ft.increment_time_by(0.04)
ft.increment_time_by(0.05)
tm = 0.005 + 0.01 + 0.02 + 0.03 + 0.04 + 0.05
assert_float(ft.get(), tm)
assert_float(sc.expected, 0.16)
def test_faketime_2():
"""Test calling the step function """
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
ft.set_time_limit(5)
sc = StepChecker()
ft.ds_cond._on_step = sc.on_step
ft.increment_new_packet()
ft.increment_new_packet()
ft.increment_new_packet()
assert_float(ft.get(), 0.06)
assert_float(sc.expected, 0.06 + 0.02)
def test_faketime_1():
"""Test expiration"""
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
ft.set_time_limit(5)
with pytest.raises(TestRanTooLong):
ft.increment_time_by(10)
child_threads = list(self._child_threads.items())
for thread, thread_info in child_threads:
thread_info["time"] -= timestep
if thread_info["time"] <= 0 and thread.is_alive():
self._children_not_running.clear()
thread_info["event"].set() # Wake it up
waiting_on.append(thread)
# Wait for everything we woke up to sleep again.
# Check that we did wake something up.
if waiting_on:
i = 0
while not self._children_not_running.wait(0.020):
i += 1
if i == self._freeze_detect_threshold:
raise TestFroze("Waiting on %s" % waiting_on)
# if this timed out, check to see if that particular
# child died...
for thread in waiting_on:
if thread.is_alive():
break # out of the for loop, at least one child is active
else:
break # out of the while loop, everyone is dead
def test_faketime_1():
"""Test expiration"""
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
ft.set_time_limit(5)
with pytest.raises(TestRanTooLong):
ft.increment_time_by(10)
def test_two_motor_drivetrain(l_motor, r_motor, output):
result = drivetrains.TwoMotorDrivetrain(speed=1 * units.fps).calculate(
l_motor, r_motor
)
result = (result.vx_fps, result.omega)
assert abs(result[0] - output[0]) < 0.001
assert abs(result[1] - output[1]) < 0.001
def test_tankdrive_get_distance():
"""Test TankModel.get_distance() by driving in a quarter circle.
If the turn angle is 90deg, then x should equal y. get_distance() runs in little steps,
so multiple calls should be equivalent to a single call"""
# just use the default robot
tank = tankmodel.TankModel.theory(
motor_cfgs.MOTOR_CFG_CIM,
robot_mass=90 * units.lbs,
gearing=10.71,
nmotors=2,
x_wheelbase=2.0 * units.feet,
wheel_diameter=6 * units.inch,
)
# slight turn to the right
l_motor = 1.0
r_motor = -0.9
# get the motors up to speed, so that subsequent calls produce the same result
tank.calculate(l_motor, r_motor, 2.0)
# figure out how much time is needed to turn 90 degrees
angle = 0.0
angle_target = -math.pi / 2.0 # 90 degrees
total_time = 0.0
def __init__(self, robot_class, robot_file, robot_path):
self.robot_class = robot_class
self._robot_file = robot_file
self._robot_path = robot_path
# Setup fake time
self._fake_time = fake_time.FakeTime()
# Setup control instance
self._control = None
self._started = False
# Setup the hal hooks so we can control time
# -> The hook doesn't have any state, so we initialize it only once
hal_impl.functions.hooks = pyfrc_fake_hooks.PyFrcFakeHooks(self._fake_time)
def test_faketime_threading():
"""Test that threads are being caught and paused correctly."""
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
incr_thread100hz = IncrementingThread(0.01, ft)
incr_thread20hz = IncrementingThread(0.05, ft)
incr_thread100hz.start()
incr_thread20hz.start()
for _ in range(4):
ft.increment_new_packet()
assert incr_thread100hz.counter == 8
assert incr_thread20hz.counter == 1
ft.teardown()
incr_thread100hz.cancel()
incr_thread20hz.cancel()
def test_faketime_dying_thread():
"""Test that dying threads are handled properly"""
wpilib.DriverStation._reset()
ft = FakeTime()
ft.initialize()
dt = DyingThread(ft)
dt.start()
for _ in range(4):
ft.increment_new_packet()
assert dt._died == True