Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _uavcan_thread(self):
while True:
try:
with self.node_lock:
self.node.spin(0.1)
time.sleep(0.01)
except uavcan.UAVCANException as ex:
print('Node error:', ex)
# self._uavcan_exit()
def run(self):
self.sliding_window_lp = SlidingWindowLowPass()
self.spectral_centroid = SpectralCentroid()
last_delay = 0.
filtered_delay = 0.
fitted_measurements = 0
slow_queue = deque(maxlen=20)
fast_queue = deque(maxlen=5)
while True:
try:
self.node.spin(0.1)
except uavcan.UAVCANException:
pass
temperature = calculate_temperature(calculate_ntc_resistance(self.recorder.temperature))
params, cov = fit_signal(self.recorder.time, self.recorder.signal)
if params:
# Put the delay through a bandpass
delay = params[1]
fast_queue.append(delay)
slow_queue.append(delay)
fast_avg = sum(fast_queue) / len(fast_queue)
slow_avg = sum(slow_queue) / len(slow_queue)
filtered_delay = fast_avg - slow_avg
if filtered_delay < -4e-3:
def node_id(self, value):
if self.is_anonymous:
value = int(value)
if not (1 <= value <= 127):
raise ValueError('Invalid Node ID [%d]' % value)
self._node_id = value
else:
raise UAVCANException('Node ID can be set only once')
def call(transfer):
event = TransferEvent(transfer, self._node, 'request' if service else 'message')
result = handler(event, **kwargs)
if service:
if result is None:
raise UAVCANException('Service request handler did not return a response [%r, %r]' %
(uavcan_type, handler))
self._node.respond(result,
transfer.source_node_id,
transfer.transfer_id,
transfer.transfer_priority)
else:
if result is not None:
raise UAVCANException('Message request handler did not return None [%r, %r]' %
(uavcan_type, handler))
def _throw_if_anonymous(self):
if not self._node_id:
raise uavcan.UAVCANException('The local node is configured in anonymous mode')
def __init__(self, node, node_monitor, database_storage=None, dynamic_node_id_range=None):
"""
:param node: Node instance.
:param node_monitor: Instance of NodeMonitor.
:param database_storage: Path to the file where the instance will keep the allocation table.
If not provided, the allocation table will be kept in memory.
:param dynamic_node_id_range: Range of node ID available for dynamic allocation; defaults to [1, 125].
"""
if node.is_anonymous:
raise UAVCANException('Dynamic node ID server cannot be launched on an anonymous node')
self._node_monitor = node_monitor
self._allocation_table = CentralizedServer.AllocationTable(database_storage or self.DATABASE_STORAGE_MEMORY)
self._query = bytes()
self._query_timestamp = 0
self._node_monitor_event_handle = node_monitor.add_update_handler(self._handle_monitor_event)
self._dynamic_node_id_range = dynamic_node_id_range or CentralizedServer.DEFAULT_NODE_ID_RANGE
self._handle = node.add_handler(uavcan.protocol.dynamic_node_id.Allocation, # @UndefinedVariable
self._on_allocation_message)
self._allocation_table.set(node.node_info.hardware_version.unique_id.to_bytes(), node.node_id)
# Initializing the table
for entry in node_monitor.find_all(lambda _: True):
def __init__(self, node, lookup_paths=None):
if node.is_anonymous:
raise uavcan.UAVCANException('File server cannot be launched on an anonymous node')
self.lookup_paths = lookup_paths or []
self._path_hit_counters = defaultdict(int)
self._handles = []
def add_handler(datatype, callback):
self._handles.append(node.add_handler(datatype, callback))
add_handler(uavcan.protocol.file.GetInfo, self._get_info)
add_handler(uavcan.protocol.file.Read, self._read)
# TODO: support all file services
def __init__(self):
if sys.version_info[0] > 2:
# Nice and easy.
self._scheduler = sched.scheduler()
# The documentation says that run() returns the next deadline,
# but it's not true - it returns the remaining time.
self._run_scheduler = lambda: self._scheduler.run(blocking=False) + self._scheduler.timefunc()
else:
# Nightmare inducing hacks
class SayNoToBlockingSchedulingException(uavcan.UAVCANException):
pass
def delayfunc_impostor(duration):
if duration > 0:
raise SayNoToBlockingSchedulingException('No!')
self._scheduler = sched.scheduler(time.monotonic, delayfunc_impostor)
def run_scheduler():
try:
self._scheduler.run()
except SayNoToBlockingSchedulingException:
q = self._scheduler.queue
return q[0][0] if q else None
self._run_scheduler = run_scheduler