Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_truncated_64bit(self):
dtype = parser.PrimitiveType(
parser.PrimitiveType.KIND_UNSIGNED_INT,
64,
parser.PrimitiveType.CAST_MODE_TRUNCATED)
value = transport.cast(0, dtype)
self.assertEqual(value, 0)
value = transport.cast(18446744073709551615, dtype)
self.assertEqual(value, 18446744073709551615)
value = transport.cast(-1, dtype)
self.assertEqual(value, 18446744073709551615)
value = transport.cast(18446744073709551617, dtype)
self.assertEqual(value, 1)
value = transport.cast(-18446744073709551615, dtype)
self.assertEqual(value, 1)
transport.format_bits(c1._pack(True)),
"00000000 00000000 0"
)
c2 = self.custom_type()
c2.a = 1
self.assertEqual(transport.get_active_union_field(c2), "a")
self.assertEqual(
transport.format_bits(c2._pack(True)),
"00000000 00011110 0"
)
c3 = self.custom_type()
c3.b[0] = 1
c3.b[1] = 3
self.assertEqual(transport.get_active_union_field(c3), "b")
self.assertEqual(
transport.format_bits(c3._pack(False)),
"10000000 10000001 1"
)
def custom_type_factory(*args, **kwargs):
return transport.CompoundValue(self.custom_type, *args,
**kwargs)
self.custom_type._instantiate = custom_type_factory
def test_representation(self):
a1 = transport.ArrayValue(self.a1_type)
a2 = transport.ArrayValue(self.a2_type)
a3 = transport.ArrayValue(self.a3_type)
for i in range(4):
a1[i] = i
for i in range(2):
a2[i] = i
for i in range(2):
a3[i].a = i
a3[i].b = i
for i2 in range(5):
a3[i].c.append(i2 & 1)
self.assertEqual(len(a3[i].c), 5)
self.assertEqual(
transport.format_bits(a1._pack(False)),
"00000000 00000001 00000010 00000011"
)
def test_empty(self):
bits = transport.bits_from_bytes(bytearray(b""))
self.assertEqual(bits, "")
def setUp(self):
msg = uavcan.protocol.debug.KeyValue()
msg.key = 'foo'
msg.value = 42
transfer = transport.Transfer(
payload=msg,
source_node_id=42,
transfer_id=10,
transfer_priority=DEFAULT_TRANSFER_PRIORITY,
service_not_message=False)
self.frames = [
CANFrame(can_id=f.message_id, data=f.bytes, extended=True)
for f in transfer.to_frames()
]
self.driver = Mock()
self.driver.receive.side_effect = self.frames + [None]
def test_partial_byte(self):
for bits in ("0", "1", "100", "001", "1001", "01001", "001001",
"1001001"):
out_bits = transport.be_from_le_bits(bits, len(bits))
self.assertEqual(out_bits, bits)
:param node_status_interval: NodeStatus broadcasting interval. Defaults to DEFAULT_NODE_STATUS_INTERVAL.
:param mode: Initial operating mode (INITIALIZATION, OPERATIONAL, etc.); defaults to INITIALIZATION.
:param node_info: Structure of type uavcan.protocol.GetNodeInfo.Response, responded with when the local
node is queried for its node info.
"""
super(Node, self).__init__()
self._handler_dispatcher = HandlerDispatcher(self)
self._can_driver = can_driver
self._node_id = node_id
self._transfer_manager = transport.TransferManager()
self._outstanding_requests = {}
self._outstanding_request_callbacks = {}
self._next_transfer_ids = collections.defaultdict(int)
self.start_time_monotonic = time.monotonic()
# Hooks
self._transfer_hook_dispatcher = TransferHookDispatcher()
# NodeStatus publisher
self.health = uavcan.protocol.NodeStatus().HEALTH_OK # @UndefinedVariable
self.mode = uavcan.protocol.NodeStatus().MODE_INITIALIZATION if mode is None else mode # @UndefinedVariable
self.vendor_specific_status_code = 0
node_status_interval = node_status_interval or DEFAULT_NODE_STATUS_INTERVAL
self.periodic(node_status_interval, self._send_node_status)
def respond(self, payload, dest_node_id, transfer_id, priority):
self._throw_if_anonymous()
transfer = transport.Transfer(
payload=payload,
source_node_id=self._node_id,
dest_node_id=dest_node_id,
transfer_id=transfer_id,
transfer_priority=priority,
service_not_message=True,
request_not_response=False
)
self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_OUTGOING, transfer)
for frame in transfer.to_frames():
self._can_driver.send(frame.message_id, frame.bytes, extended=True)
logger.debug("Node.respond(dest_node_id={0:d}, transfer_id={0:d}, priority={0:d}): sent {1!r}"
.format(dest_node_id, transfer_id, priority, payload))
def _recv_frame(self, raw_frame):
if not raw_frame.extended:
return
frame = transport.Frame(raw_frame.id, raw_frame.data, raw_frame.ts_monotonic, raw_frame.ts_real)
transfer_frames = self._transfer_manager.receive_frame(frame)
if not transfer_frames:
return
transfer = transport.Transfer()
transfer.from_frames(transfer_frames)
self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_INCOMING, transfer)
if (transfer.service_not_message and not transfer.request_not_response) and \
transfer.dest_node_id == self._node_id:
# This is a reply to a request we sent. Look up the original request and call the appropriate callback
requests = self._outstanding_requests.keys()
for key in requests:
if transfer.is_response_to(self._outstanding_requests[key]):