Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_input_session(self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata) -> CANInputSession:
"""
See the base class docs for background.
Whenever an input session is created or destroyed, the hardware acceptance filters are reconfigured
automatically; computation of a new configuration and its deployment on the CAN controller may be slow.
"""
if self._maybe_media is None:
raise pyuavcan.transport.ResourceClosedError(f'{self} is closed')
def finalizer() -> None:
self._input_dispatch_table.remove(specifier)
self._reconfigure_acceptance_filters()
session = self._input_dispatch_table.get(specifier)
if session is None:
session = CANInputSession(specifier=specifier,
payload_metadata=payload_metadata,
loop=self._loop,
finalizer=finalizer)
self._input_dispatch_table.add(session)
self._reconfigure_acceptance_filters()
return session
def _raise_if_closed(self) -> None:
if self._close_finalizer is None:
raise pyuavcan.transport.ResourceClosedError(f'Session is closed: {self}')
async def task_function() -> None:
# This could be an interesting opportunity for optimization: instead of using the queue, just let the
# implementation class invoke the handler from its own receive task directly. Eliminates extra indirection.
while not self._closed:
try:
message, transfer = await self.receive()
try:
await handler(message, transfer)
except asyncio.CancelledError:
raise
except Exception as ex:
_logger.exception('%s got an unhandled exception in the message handler: %s', self, ex)
except asyncio.CancelledError:
_logger.debug('%s receive task cancelled', self)
break
except pyuavcan.transport.ResourceClosedError as ex:
_logger.info('%s receive task got a resource closed error and will exit: %s', self, ex)
break
except Exception as ex:
_logger.exception('%s receive task failure: %s', self, ex)
await asyncio.sleep(1) # TODO is this an adequate failure management strategy?
def _raise_if_closed(self) -> None:
if self._close_finalizer is None:
raise pyuavcan.transport.ResourceClosedError(
f'The requested action cannot be performed because the session object {self} is closed')
async def task_function() -> None:
while not self._closed:
try:
await self.serve_for(handler, _LISTEN_FOREVER_TIMEOUT)
except asyncio.CancelledError:
_logger.debug('%s task cancelled', self)
break
except pyuavcan.transport.ResourceClosedError as ex:
_logger.info('%s task got a resource closed error and will exit: %s', self, ex)
break
except Exception as ex:
_logger.exception('%s task failure: %s', self, ex)
await asyncio.sleep(1) # TODO is this an adequate failure management strategy?
def _ensure_not_closed(self) -> None:
if self._closed:
raise pyuavcan.transport.ResourceClosedError(f'{self} is closed')
#
# Copyright (c) 2019 UAVCAN Development Team
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
#
import pyuavcan.transport
class PresentationSessionClosedError(pyuavcan.transport.ResourceClosedError):
"""
Raised when an attempt is made to use a presentation-layer session instance that has been closed.
Observe that it is a specialization of the corresponding transport-layer error type.
Double-close is NOT an error, so closing the same instance twice will not result in this exception being raised.
"""
pass
class RequestTransferIDVariabilityExhaustedError(pyuavcan.transport.TransportError):
"""
Raised when an attempt is made to invoke more concurrent requests that supported by the transport layer.
For CAN, the number is 32; for some transports the number is unlimited (technically, there is always a limit,
but for some transports, such as the serial transport, it is unreachable in practice).
"""
pass
def add_listener(self, source_node_id: typing.Optional[int], handler: Listener) -> None:
"""
:param source_node_id: The listener will be invoked whenever a frame from this node-ID is received.
If the value is None, the listener will be invoked for all source node-IDs (promiscuous).
There shall be at most one listener per source node-ID value (incl. None, i.e., at most one
promiscuous listener).
If such listener already exists, a :class:`ValueError` will be raised.
:param handler: The callable of type :attr:`Listener` that received frames will be passed to.
If a frame is received that cannot be parsed, the callable will be invoked with None
in order to let it update its error statistics.
"""
if self._closed:
raise pyuavcan.transport.ResourceClosedError(f'{self} is closed')
if source_node_id in self._listeners:
raise ValueError(f'{self}: The listener for node-ID {source_node_id} is already registered '
f'with handler {self._listeners[source_node_id]}')
self._listeners[source_node_id] = handler
_logger.debug('%r: Adding listener %r for node-ID %r', self, handler, source_node_id)
async def send_until(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline: float) -> bool:
if self._closed:
raise pyuavcan.transport.ResourceClosedError(f'{self} is closed')
def construct_frame(index: int, end_of_transfer: bool, payload: memoryview) -> UDPFrame:
return UDPFrame(timestamp=transfer.timestamp,
priority=transfer.priority,
transfer_id=transfer.transfer_id,
index=index,
end_of_transfer=end_of_transfer,
payload=payload,
data_type_hash=self._payload_metadata.data_type_hash)
frames = [
fr.compile_header_and_payload()
for fr in
pyuavcan.transport.commons.high_overhead_transport.serialize_transfer(
transfer.fragmented_payload,
self._mtu,