Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_background(self):
"""Function meant to be called in a separate
thread to continuosly check for incoming
messages. The frequency at which new
messages are checked could require some
tweaking depending on the hardware limitations.
and the number of robots broadcasting.
"""
while self.broadcasting:
self.read()
sleep(self.period / 15.) # assuming is enough to get all messages
self.awake.wait()
return
class XBeeExpirationNetwork(XBeeNetwork):
"""Extension of XBeeNetwork where the data
received is ignored after *expiration_time*
number of seconds since it was first sent
(according to the sender).
"""
def __init__(self, expiration_time, window_start, window_end,
period=1, ID=None, lock=None):
self.expiration_time = expiration_time
self.expirations = {}
XBeeNetwork.__init__(self, window_start, window_end, period, ID, lock)
return
def parse_state(self, message):
"""Parse a message containing x, y, theta, time, ID.
Store the expiration date of the message in self.expirations."""
try: