Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unittest_frame_check() -> None:
from pytest import raises
from pyuavcan.transport import Priority, MessageDataSpecifier, ServiceDataSpecifier, Timestamp
_ = SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=123,
destination_node_id=456,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=False,
payload=memoryview(b'abcdef'))
with raises(ValueError):
SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=123456,
destination_node_id=456,
data_specifier=MessageDataSpecifier(12345),
def construct_frame(index: int, end_of_transfer: bool, payload: memoryview) -> SerialFrame:
if not end_of_transfer and local_node_id is None:
raise pyuavcan.transport.OperationNotDefinedForAnonymousNodeError(
f'Anonymous nodes cannot emit multi-frame transfers. Session specifier: {self._specifier}')
return SerialFrame(timestamp=transfer.timestamp,
priority=transfer.priority,
transfer_id=transfer.transfer_id,
index=index,
end_of_transfer=end_of_transfer,
payload=payload,
source_node_id=local_node_id,
destination_node_id=self._specifier.remote_node_id,
data_specifier=self._specifier.data_specifier,
data_type_hash=self._payload_metadata.data_type_hash)
def _process_byte(self, b: int, timestamp: pyuavcan.transport.Timestamp) -> None:
# Reception of a frame delimiter terminates the current frame unconditionally.
if b == SerialFrame.FRAME_DELIMITER_BYTE:
self._finalize(known_invalid=not self._is_inside_frame())
self._current_frame_timestamp = timestamp
return
# Unescaping is done only if we're inside a frame currently.
if self._is_inside_frame():
if b == SerialFrame.ESCAPE_PREFIX_BYTE:
self._unescape_next = True
return
if self._unescape_next:
self._unescape_next = False
b ^= 0xFF
# Appending to the buffer always, regardless of whether we're in a frame or not.
# We may find out that the data does not belong to the protocol only much later; can't look ahead.
self._frame_buffer.append(b)
index=1234567,
end_of_transfer=False,
payload=memoryview(b''))
buffer = bytearray(0 for _ in range(50))
mv = f.compile_into(buffer)
assert mv[0] == mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE
segment = bytes(mv[1:-1])
assert SerialFrame.FRAME_DELIMITER_BYTE not in segment
# Header validation
assert segment[0] == _VERSION
assert segment[1] == int(Priority.FAST)
assert segment[2] == SerialFrame.ESCAPE_PREFIX_BYTE
assert (segment[3], segment[4]) == (SerialFrame.FRAME_DELIMITER_BYTE ^ 0xFF, 0)
assert (segment[5], segment[6]) == (0xFF, 0xFF)
assert segment[7:9] == ((1 << 15) | (1 << 14) | 123) .to_bytes(2, 'little')
assert segment[9:17] == 0xdead_beef_bad_c0ffe .to_bytes(8, 'little')
assert segment[17:25] == 1234567890123456789 .to_bytes(8, 'little')
assert segment[25:29] == 1234567 .to_bytes(4, 'little')
# Header CRC here
# CRC validation
assert segment[33:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes
def _unittest_frame_compile_message() -> None:
from pyuavcan.transport import Priority, MessageDataSpecifier, Timestamp
f = SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=True,
payload=memoryview(b'abcd\x9Eef\x8E'))
buffer = bytearray(0 for _ in range(1000))
mv = f.compile_into(buffer)
assert mv[0] == SerialFrame.FRAME_DELIMITER_BYTE
assert mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE
segment = bytes(mv[1:-1])
def _finalize(self, known_invalid: bool) -> None:
try:
mv = memoryview(self._frame_buffer)
parsed: typing.Optional[SerialFrame] = None
if (not known_invalid) and len(mv) <= self._max_frame_size_bytes:
assert self._current_frame_timestamp is not None
parsed = SerialFrame.parse_from_unescaped_image(mv, self._current_frame_timestamp)
if parsed:
self._callback(parsed)
elif mv:
self._callback(mv)
else:
pass # Empty - nothing to report.
finally:
self._unescape_next = False
self._current_frame_timestamp = None
self._frame_buffer = bytearray() # There are memoryview instances pointing to the old buffer!
def mk_frame(transfer_id: int,
index: int,
end_of_transfer: bool,
payload: typing.Union[bytes, memoryview],
source_node_id: typing.Optional[int]) -> SerialFrame:
return SerialFrame(timestamp=ts,
priority=prio,
transfer_id=transfer_id,
index=index,
end_of_transfer=end_of_transfer,
payload=memoryview(payload),
source_node_id=source_node_id,
destination_node_id=dst_nid,
data_specifier=session_spec.data_specifier,
data_type_hash=payload_meta.data_type_hash)
if int_data_spec & (1 << 14):
role = pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE
else:
role = pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST
service_id = int_data_spec & pyuavcan.transport.ServiceDataSpecifier.SERVICE_ID_MASK
data_specifier = pyuavcan.transport.ServiceDataSpecifier(service_id, role)
try:
return SerialFrame(timestamp=timestamp,
priority=pyuavcan.transport.Priority(int_priority),
source_node_id=src_nid,
destination_node_id=dst_nid,
data_specifier=data_specifier,
data_type_hash=dt_hash,
transfer_id=transfer_id,
index=index_eot & SerialFrame.INDEX_MASK,
end_of_transfer=index_eot & (1 << 31) != 0,
payload=payload)
except ValueError:
return None
max_payload_size_bytes: int):
"""
:param callback: Invoked when a new frame is parsed or when a block of data could not be recognized as a frame.
In the case of success, an instance of the frame class is passed; otherwise, raw memoryview is passed.
In either case, the referenced memory is guaranteed to be immutable.
:param max_payload_size_bytes: Frames containing more that this many bytes of payload (after escaping and
not including the header and CRC) will be considered invalid.
"""
max_payload_size_bytes = int(max_payload_size_bytes)
if not (callable(callback) and max_payload_size_bytes > 0):
raise ValueError('Invalid parameters')
# Constant configuration
self._callback = callback
self._max_frame_size_bytes = \
int(max_payload_size_bytes) + SerialFrame.NUM_OVERHEAD_BYTES_EXCEPT_DELIMITERS_AND_ESCAPING
# Parser state
self._frame_buffer = bytearray() # Entire frame except delimiters.
self._unescape_next = False
self._current_frame_timestamp: typing.Optional[pyuavcan.transport.Timestamp] = None
payload=memoryview(b'abcdef'))
with raises(ValueError):
SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=None,
destination_node_id=456,
data_specifier=ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=False,
payload=memoryview(b'abcdef'))
with raises(ValueError):
SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=None,
destination_node_id=None,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=2 ** 64,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=False,
payload=memoryview(b'abcdef'))
with raises(ValueError):
SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=None,
destination_node_id=None,
data_specifier=MessageDataSpecifier(12345),