Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print('PROMISCUOUS SUBSCRIBER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.LOW
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print('tr :', tr.sample_statistics())
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
].accepted_datagrams == {222: 1}
assert tr.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST)
].accepted_datagrams == {}
print('tr2:', tr2.sample_statistics())
assert tr2.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE)
].accepted_datagrams == {}
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
#
# Service exchange test.
#
assert await client_requester.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.HIGH,
transfer_id=88888,
fragmented_payload=payload_x3),
monotonic_deadline=get_monotonic() + 5.0
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
client_requester = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
assert client_requester is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
client_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 3210), meta)
assert client_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 3210), meta)
print('INPUTS:', tr.input_sessions)
print('OUTPUTS:', tr.output_sessions)
assert set(tr.input_sessions) == {subscriber_promiscuous, subscriber_selective, server_listener, client_listener}
int(Priority.LOW),
0x01, 0x00,
0x00, 0x00,
0x10, 0x80, # Request, service ID 16
0x0D, 0xF0, 0xDD, 0xE0, 0xFE, 0x0F, 0xDC, 0xBA,
0xD2, 0x0A, 0x1F, 0xEB, 0x8C, 0xA9, 0x54, 0xAB,
0x31, 0xD4, 0x00, 0x00,
])
header += get_crc(header)
assert len(header) == 32
f = SerialFrame.parse_from_unescaped_image(memoryview(header + get_crc(b'')), ts)
assert f == SerialFrame(
priority=Priority.LOW,
source_node_id=1,
destination_node_id=0,
data_specifier=ServiceDataSpecifier(16, ServiceDataSpecifier.Role.REQUEST),
data_type_hash=0xbad_c0ffee_0dd_f00d,
transfer_id=12345678901234567890,
index=54321,
end_of_transfer=False,
payload=memoryview(b''),
timestamp=ts,
)
# Too short
assert SerialFrame.parse_from_unescaped_image(memoryview(header[1:] + get_crc(payload)), ts) is None
# Bad CRC
assert SerialFrame.parse_from_unescaped_image(memoryview(header + payload + b'1234'), ts) is None
# Bad version
header = bytes([
self._specifier = specifier
self._payload_metadata = payload_metadata
self._sft_payload_capacity_bytes = int(sft_payload_capacity_bytes)
self._local_node_id_accessor = local_node_id_accessor
self._send_handler = send_handler
self._feedback_handler: typing.Optional[typing.Callable[[pyuavcan.transport.Feedback], None]] = None
self._statistics = pyuavcan.transport.SessionStatistics()
assert callable(self._local_node_id_accessor)
assert callable(send_handler)
if not isinstance(self._specifier, pyuavcan.transport.SessionSpecifier) or \
not isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata): # pragma: no cover
raise TypeError('Invalid parameters')
if isinstance(specifier.data_specifier, pyuavcan.transport.ServiceDataSpecifier):
is_response = specifier.data_specifier.role == pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE
if is_response and specifier.remote_node_id is None:
raise pyuavcan.transport.UnsupportedSessionConfigurationError(
f'Cannot broadcast a service response. Session specifier: {specifier}')
super(SerialOutputSession, self).__init__(finalizer)
_HEADER_WITHOUT_CRC_FORMAT.unpack_from(header)
if version != _VERSION:
return None
src_nid = None if src_nid == _ANONYMOUS_NODE_ID else src_nid
dst_nid = None if dst_nid == _ANONYMOUS_NODE_ID else dst_nid
data_specifier: pyuavcan.transport.DataSpecifier
if int_data_spec & (1 << 15) == 0:
data_specifier = pyuavcan.transport.MessageDataSpecifier(int_data_spec)
else:
if int_data_spec & (1 << 14):
role = pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE
else:
role = pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST
service_id = int_data_spec & pyuavcan.transport.ServiceDataSpecifier.SERVICE_ID_MASK
data_specifier = pyuavcan.transport.ServiceDataSpecifier(service_id, role)
try:
return SerialFrame(timestamp=timestamp,
priority=pyuavcan.transport.Priority(int_priority),
source_node_id=src_nid,
destination_node_id=dst_nid,
data_specifier=data_specifier,
data_type_hash=dt_hash,
transfer_id=transfer_id,
index=index_eot & SerialFrame.INDEX_MASK,
end_of_transfer=index_eot & (1 << 31) != 0,
payload=payload)
except ValueError:
return None
frames=2,
payload_bytes=11,
errors=0,
drops=0
)
assert sos.socket.fileno() >= 0
assert not finalized
sos.close()
assert finalized
assert sos.socket.fileno() < 0 # The socket is supposed to be disposed of.
finalized = False
# Multi-frame with multiplication
sos = UDPOutputSession(
specifier=OutputSessionSpecifier(ServiceDataSpecifier(321, ServiceDataSpecifier.Role.REQUEST), 2222),
payload_metadata=PayloadMetadata(0xdead_beef_badc0ffe, 1024),
mtu=10,
multiplier=2,
sock=make_sock(),
loop=asyncio.get_event_loop(),
finalizer=do_finalize,
)
assert run_until_complete(sos.send_until(
Transfer(timestamp=ts,
priority=Priority.OPTIONAL,
transfer_id=54321,
fragmented_payload=[memoryview(b'one'), memoryview(b'two'), memoryview(b'three')]),
loop.time() + 10.0
))
data_main_a, endpoint = sock_rx.recvfrom(1000)
assert endpoint[0] == '127.100.0.2'