Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_update_loop(thread, update_func):
""" Makes a run loop which calls an update function at a predefined frequency. """
while not thread.should_stop():
if thread.should_pause():
thread.wait_to_resume()
start = time.time()
if hasattr(thread, '_updated'):
thread._updated.clear()
update_func()
if hasattr(thread, '_updated'):
thread._updated.set()
end = time.time()
dt = thread.period - (end - start)
if dt > 0:
time.sleep(dt)
def _wrapped_update(self):
logger.debug('LoopPrimitive %s updated.', self)
self._recent_updates.append(time.time())
self.update()
def _prim_setup(self):
logger.info("Primitive %s setup.", self)
for m in self.robot.motors:
m._to_set.clear()
self.robot._primitive_manager.add(self)
self.setup()
self.t0 = time.time()
def elapsed_time(self):
return time.time() - self.t0
def elapsed_time(self):
""" Elapsed time (in seconds) since the primitive runs. """
return time.time() - self.t0
def setup(self):
if self.duration < numpy.finfo(float).eps:
self.motor.goal_position = self.goal
self.stop()
else:
self.trajs = MinimumJerkTrajectory(self.motor.present_position, self.goal, self.duration).get_generator()
self.t0 = time.time()
def make_update_loop(thread, update_func):
""" Makes a run loop which calls an update function at a predefined frequency. """
while not thread.should_stop():
if thread.should_pause():
thread.wait_to_resume()
start = time.time()
if hasattr(thread, '_updated'):
thread._updated.clear()
update_func()
if hasattr(thread, '_updated'):
thread._updated.set()
end = time.time()
dt = thread.period - (end - start)
if dt > 0:
time.sleep(dt)