Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _unittest_slow_presentation_rpc(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
transport_factory: TransportFactory) -> None:
assert generated_packages
import uavcan.register
import uavcan.primitive
import uavcan.time
from pyuavcan.transport import Priority, Timestamp
tran_a, tran_b, _ = transport_factory(123, 42)
assert tran_a.local_node_id == 123
assert tran_b.local_node_id == 42
pres_a = pyuavcan.presentation.Presentation(tran_a)
pres_b = pyuavcan.presentation.Presentation(tran_b)
assert pres_a.transport is tran_a
server = pres_a.get_server_with_fixed_service_id(uavcan.register.Access_1_0)
assert server is pres_a.get_server_with_fixed_service_id(uavcan.register.Access_1_0)
client0 = pres_b.make_client_with_fixed_service_id(uavcan.register.Access_1_0, 123)
client1 = pres_b.make_client_with_fixed_service_id(uavcan.register.Access_1_0, 123)
client_dead = pres_b.make_client_with_fixed_service_id(uavcan.register.Access_1_0, 111)
assert client0 is not client1
assert client0._maybe_impl is not None
assert client1._maybe_impl is not None
assert client0._maybe_impl is client1._maybe_impl
assert client0._maybe_impl is not client_dead._maybe_impl
assert client0._maybe_impl.proxy_count == 2
async def _unittest_slow_plug_and_play_allocatee(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) \
-> None:
from pyuavcan.application.plug_and_play import Allocatee, NodeIDAllocationData_2, ID
assert generated_packages
peers: typing.Set[MockMedia] = set()
pres_client = Presentation(CANTransport(MockMedia(peers, 64, 1), None))
pres_server = Presentation(CANTransport(MockMedia(peers, 64, 1), 123))
allocatee = Allocatee(pres_client, _uid('00112233445566778899aabbccddeeff'), 42)
allocatee.start()
pub = pres_server.make_publisher_with_fixed_subject_id(NodeIDAllocationData_2)
await pub.publish(NodeIDAllocationData_2(ID(10), unique_id=_uid('aabbccddeeff00112233445566778899'))) # Mismatch.
await asyncio.sleep(1.0)
assert allocatee.get_result() is None
await pub.publish(NodeIDAllocationData_2(ID(999), unique_id=_uid('00112233445566778899aabbccddeeff'))) # Bad NID.
await asyncio.sleep(1.0)
assert allocatee.get_result() is None
await pub.publish(NodeIDAllocationData_2(ID(0), unique_id=_uid('00112233445566778899aabbccddeeff'))) # Correct.
await asyncio.sleep(1.0)
assert allocatee.get_result() == 0
async def _unittest_slow_plug_and_play_centralized(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
mtu: int) -> None:
from pyuavcan.application.plug_and_play import CentralizedAllocator, Allocatee
assert generated_packages
peers: typing.Set[MockMedia] = set()
pres_client = Presentation(CANTransport(MockMedia(peers, mtu, 1), None))
pres_server = Presentation(CANTransport(MockMedia(peers, mtu, 1), 123))
cln_a = Allocatee(pres_client, _uid('00112233445566778899aabbccddeeff'), 42)
assert cln_a.get_result() is None
cln_a.start()
await asyncio.sleep(2.0)
assert cln_a.get_result() is None # Nope, no response.
try:
_TABLE.unlink()
except FileNotFoundError:
pass
with pytest.raises(ValueError, match='.*anonymous.*'):
CentralizedAllocator(pres_client, _uid('deadbeefdeadbeefdeadbeefdeadbeef'), _TABLE)
with pytest.raises(ValueError):
CentralizedAllocator(pres_client, b'123', _TABLE)
allocator = CentralizedAllocator(pres_server, _uid('deadbeefdeadbeefdeadbeefdeadbeef'), _TABLE)
async def _unittest_slow_node_tracker(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
caplog: typing.Any) -> None:
from . import get_transport
from pyuavcan.presentation import Presentation
from pyuavcan.application.node_tracker import NodeTracker, Entry, GetInfo, Heartbeat
assert generated_packages
p_a = Presentation(get_transport(0xA))
p_b = Presentation(get_transport(0xB))
p_c = Presentation(get_transport(0xC))
p_trk = Presentation(get_transport(None))
try:
last_update_args: typing.List[typing.Tuple[int, typing.Optional[Entry], typing.Optional[Entry]]] = []
def simple_handler(node_id: int, old: typing.Optional[Entry], new: typing.Optional[Entry]) -> None:
last_update_args.append((node_id, old, new))
def faulty_handler(_node_id: int, _old: typing.Optional[Entry], _new: typing.Optional[Entry]) -> None:
raise Exception('INTENDED EXCEPTION')
trk = NodeTracker(p_trk)
assert not trk.registry
assert transport.local_node_id == 42 # Yup, the node-ID is configured.
# Populate the node info for use with the Node class. Please see the DSDL definition of uavcan.node.GetInfo.
node_info = uavcan.node.GetInfo_1_0.Response(
# Version of the protocol supported by the library, and hence by our node.
protocol_version=uavcan.node.Version_1_0(*pyuavcan.UAVCAN_SPECIFICATION_VERSION),
# There is a similar field for hardware version, but we don't populate it because it's a software-only node.
software_version=uavcan.node.Version_1_0(major=1, minor=0),
# The name of the local node. Should be a reversed Internet domain name, like a Java package.
name='org.uavcan.pyuavcan.demo.basic_usage',
# We've left the optional fields default-initialized here.
)
# The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
presentation = pyuavcan.presentation.Presentation(transport)
# The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
# heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
self._node = pyuavcan.application.Node(presentation, node_info)
# Published heartbeat fields can be configured trivially by assigning them on the heartbeat publisher instance.
self._node.heartbeat_publisher.mode = uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL
# In this example here we assign the local process' PID to the vendor-specific status code (VSSC) and make
# sure that the valid range is not exceeded.
self._node.heartbeat_publisher.vendor_specific_status_code = \
os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(uavcan.node.Heartbeat_1_0)[
'vendor_specific_status_code'].data_type.bit_length_set) - 1)
# Now we can create our session objects as necessary. They can be created or destroyed later at any point
# after initialization. It's not necessary to set everything up during the initialization.
srv_least_squares = self._node.presentation.get_server(sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0, 123)
assert new.heartbeat.vendor_specific_status_code == 0xf00d
assert new.info is None
elif num_events_c == 1:
assert old is not None
assert new is None
assert old.heartbeat.vendor_specific_status_code == 0xf00d
assert old.info is None
else:
assert False
num_events_c += 1
else:
assert False
trk.close()
trk.close() # Idempotency
p_trk = Presentation(get_transport(0xDD))
trk = NodeTracker(p_trk)
trk.add_update_handler(validating_handler)
trk.start()
trk.get_info_timeout = 1.0
trk.get_info_attempts = 2
assert pytest.approx(trk.get_info_timeout) == 1.0
assert trk.get_info_attempts == 2
await asyncio.sleep(2.5)
assert num_events_a == 2
assert num_events_b == 2
assert num_events_c == 0
assert list(trk.registry.keys()) == [0xA, 0xB]
assert 12 >= trk.registry[0xA].heartbeat.uptime >= 8
assert trk.registry[0xA].heartbeat.vendor_specific_status_code == 0xdead
assert trk.registry[0xA].info is not None
async def _unittest_slow_node_tracker(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
caplog: typing.Any) -> None:
from . import get_transport
from pyuavcan.presentation import Presentation
from pyuavcan.application.node_tracker import NodeTracker, Entry, GetInfo, Heartbeat
assert generated_packages
p_a = Presentation(get_transport(0xA))
p_b = Presentation(get_transport(0xB))
p_c = Presentation(get_transport(0xC))
p_trk = Presentation(get_transport(None))
try:
last_update_args: typing.List[typing.Tuple[int, typing.Optional[Entry], typing.Optional[Entry]]] = []
def simple_handler(node_id: int, old: typing.Optional[Entry], new: typing.Optional[Entry]) -> None:
last_update_args.append((node_id, old, new))
def faulty_handler(_node_id: int, _old: typing.Optional[Entry], _new: typing.Optional[Entry]) -> None:
raise Exception('INTENDED EXCEPTION')
trk = NodeTracker(p_trk)
assert not trk.registry
assert pytest.approx(trk.get_info_timeout) == trk.DEFAULT_GET_INFO_TIMEOUT
async def _unittest_slow_plug_and_play_allocatee(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) \
-> None:
from pyuavcan.application.plug_and_play import Allocatee, NodeIDAllocationData_2, ID
assert generated_packages
peers: typing.Set[MockMedia] = set()
pres_client = Presentation(CANTransport(MockMedia(peers, 64, 1), None))
pres_server = Presentation(CANTransport(MockMedia(peers, 64, 1), 123))
allocatee = Allocatee(pres_client, _uid('00112233445566778899aabbccddeeff'), 42)
allocatee.start()
pub = pres_server.make_publisher_with_fixed_subject_id(NodeIDAllocationData_2)
await pub.publish(NodeIDAllocationData_2(ID(10), unique_id=_uid('aabbccddeeff00112233445566778899'))) # Mismatch.
await asyncio.sleep(1.0)
assert allocatee.get_result() is None
await pub.publish(NodeIDAllocationData_2(ID(999), unique_id=_uid('00112233445566778899aabbccddeeff'))) # Bad NID.
await asyncio.sleep(1.0)
assert allocatee.get_result() is None
await pub.publish(NodeIDAllocationData_2(ID(0), unique_id=_uid('00112233445566778899aabbccddeeff'))) # Correct.
await asyncio.sleep(1.0)
assert allocatee.get_result() == 0
async def _run(transport: pyuavcan.transport.Transport) -> int:
import uavcan.node
node_id_set_cardinality = transport.protocol_parameters.node_id_set_cardinality
if node_id_set_cardinality >= 2 ** 32:
# Special case: for very large sets just pick a random number. Very large sets are only possible with test
# transports such as loopback so it's acceptable. If necessary, later we could develop a more robust solution.
print(random.randint(0, node_id_set_cardinality - 1))
return 0
candidates = set(range(node_id_set_cardinality))
pres = pyuavcan.presentation.Presentation(transport)
with contextlib.closing(pres):
deadline = asyncio.get_event_loop().time() + uavcan.node.Heartbeat_1_0.MAX_PUBLICATION_PERIOD * 2.0
sub = pres.make_subscriber_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
while asyncio.get_event_loop().time() <= deadline:
result = await sub.receive_until(deadline)
if result is not None:
msg, transfer = result
assert isinstance(transfer, pyuavcan.transport.TransferFrom)
_logger.debug('Received %r via %r', msg, transfer)
if transfer.source_node_id is None:
_logger.warning('FYI, the network contains an anonymous node which is publishing Heartbeat. '
'Please contact the vendor and inform them that this behavior is non-compliant. '
'The offending heartbeat message is: %r, transfer: %r', msg, transfer)
else:
try:
candidates.remove(int(transfer.source_node_id))
def construct_subsystem(self, args: argparse.Namespace) -> object:
"""
We use object instead of Node because the Node class requires generated code to be generated.
"""
from pyuavcan import application
node_info = pyuavcan.dsdl.update_from_builtin(application.NodeInfo(), args.node_info_fields)
_logger.debug('Node info: %r', node_info)
transport = self._transport_factory.construct_subsystem(args)
presentation = pyuavcan.presentation.Presentation(transport)
node = application.Node(presentation, info=node_info)
try:
# Configure the heartbeat publisher.
if args.heartbeat_fields.pop('uptime', None) is not None:
_logger.warning('Specifying uptime has no effect because it will be overridden by the node.')
node.heartbeat_publisher.health = \
args.heartbeat_fields.pop('health', application.heartbeat_publisher.Health.NOMINAL)
node.heartbeat_publisher.mode = \
args.heartbeat_fields.pop('mode', application.heartbeat_publisher.Mode.OPERATIONAL)
node.heartbeat_publisher.vendor_specific_status_code = args.heartbeat_fields.pop(
'vendor_specific_status_code',
os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(application.heartbeat_publisher.Heartbeat)
['vendor_specific_status_code'].data_type.bit_length_set) - 1)
)
_logger.debug('Node heartbeat: %r', node.heartbeat_publisher.make_message())
if args.heartbeat_fields: