Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print('Generated DSDL packages will be stored in:', dsdl_generated_dir, file=sys.stderr)
# We will need to import the packages once they are generated, so we should update the module import look-up path set.
# If you're using an IDE for development, add this path to its look-up set as well for code completion to work.
sys.path.insert(0, str(dsdl_generated_dir))
# Now we can import our packages. If import fails, invoke the code generator, then import again.
try:
import sirius_cyber_corp # This is our vendor-specific root namespace. Custom data types.
import pyuavcan.application # The application module requires the standard types from the root namespace "uavcan".
except (ImportError, AttributeError):
script_path = os.path.abspath(os.path.dirname(__file__))
# Generate our vendor-specific namespace. It may make use of the standard data types (most namespaces do,
# because the standard root namespace contains important basic types), so we include it in the lookup path set.
# The paths are hard-coded here for the sake of conciseness.
pyuavcan.dsdl.generate_package(
root_namespace_directory=os.path.join(script_path, '../dsdl/namespaces/sirius_cyber_corp/'),
lookup_directories=[os.path.join(script_path, '../public_regulated_data_types/uavcan')],
output_directory=dsdl_generated_dir,
)
# Generate the standard namespace. The order actually doesn't matter.
pyuavcan.dsdl.generate_package(
root_namespace_directory=os.path.join(script_path, '../public_regulated_data_types/uavcan'),
output_directory=dsdl_generated_dir,
)
# Okay, we can try importing again. We need to clear the import cache first because Python's import machinery
# requires that; see the docs for importlib.invalidate_caches() for more info.
importlib.invalidate_caches()
import sirius_cyber_corp
import pyuavcan.application
# Import other namespaces we're planning to use. Nested namespaces are not auto-imported, so in order to reach,
def _unittest_slow_builtin_form_manual(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
import uavcan.node
import uavcan.register
import uavcan.primitive.array
import uavcan.time
bi = pyuavcan.dsdl.to_builtin(uavcan.node.Heartbeat_1_0(uptime=123456,
health=2,
mode=6,
vendor_specific_status_code=0xbad))
assert bi == {
'uptime': 123456,
'health': 2,
'mode': 6,
'vendor_specific_status_code': 2989,
}
bi = pyuavcan.dsdl.to_builtin(uavcan.node.GetInfo_1_0.Response(
protocol_version=uavcan.node.Version_1_0(1, 2),
hardware_version=uavcan.node.Version_1_0(3, 4),
software_version=uavcan.node.Version_1_0(5, 6),
software_vcs_revision_id=0xbadc0ffee0ddf00d,
unique_id=b'0123456789abcdef',
def construct_subsystem(self, args: argparse.Namespace) -> object:
"""
We use object instead of Node because the Node class requires generated code to be generated.
"""
from pyuavcan import application
node_info = pyuavcan.dsdl.update_from_builtin(application.NodeInfo(), args.node_info_fields)
_logger.debug('Node info: %r', node_info)
transport = self._transport_factory.construct_subsystem(args)
presentation = pyuavcan.presentation.Presentation(transport)
node = application.Node(presentation, info=node_info)
try:
# Configure the heartbeat publisher.
if args.heartbeat_fields.pop('uptime', None) is not None:
_logger.warning('Specifying uptime has no effect because it will be overridden by the node.')
node.heartbeat_publisher.health = \
args.heartbeat_fields.pop('health', application.heartbeat_publisher.Health.NOMINAL)
node.heartbeat_publisher.mode = \
args.heartbeat_fields.pop('mode', application.heartbeat_publisher.Mode.OPERATIONAL)
node.heartbeat_publisher.vendor_specific_status_code = args.heartbeat_fields.pop(
'vendor_specific_status_code',
os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(application.heartbeat_publisher.Heartbeat)
async def get_server_with_fixed_service_id(self, cls: typing.Type[FixedPortServiceClass]) -> None:
return await self.get_server(cls=cls, service_id=pyuavcan.dsdl.get_fixed_port_id(cls))
async def publish(self, message: DataTypeClass) -> None:
fragmented_payload = pyuavcan.dsdl.serialize(message)
# noinspection PyCallByClass
transfer = pyuavcan.transport.Publisher.Transfer(priority=self._priority,
transfer_id=0, # TODO
fragmented_payload=fragmented_payload,
loopback=self._loopback)
await self._tlp.publish(transfer)
async def _do_send_until(self,
request: pyuavcan.dsdl.CompositeObject,
transfer_id: int,
priority: pyuavcan.transport.Priority,
monotonic_deadline: float) -> bool:
if not isinstance(request, self.dtype.Request):
raise TypeError(f'Invalid request object: expected an instance of {self.dtype.Request}, '
f'got {type(request)} instead.')
timestamp = pyuavcan.transport.Timestamp.now()
fragmented_payload = list(pyuavcan.dsdl.serialize(request))
transfer = pyuavcan.transport.Transfer(timestamp=timestamp,
priority=priority,
transfer_id=transfer_id,
fragmented_payload=fragmented_payload)
return await self.output_transport_session.send_until(transfer, monotonic_deadline)
Creates a new client instance for the specified service-ID and the remote server node-ID.
The number of such instances can be arbitrary.
For example, different tasks may simultaneously create and use client instances
invoking the same service on the same server node.
All clients created with a specific combination of service-ID and server node-ID share the same
underlying implementation object which is hidden from the user.
The implementation instance is reference counted and it is destroyed automatically along with its
underlying transport level session instances when its last client is closed.
The client instance will be closed automatically from its finalizer when garbage
collected if the user did not bother to do that manually.
This logic follows the RAII pattern.
See :class:`Client` for further information about clients.
"""
if not issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a service type: {dtype}')
self._raise_if_closed()
def transfer_id_modulo_factory() -> int:
return self._transport.protocol_parameters.transfer_id_modulo
input_session_specifier = pyuavcan.transport.InputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE),
server_node_id
)
output_session_specifier = pyuavcan.transport.OutputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST),
server_node_id
)
try:
async def _do_send_until(response: ServiceResponseClass,
metadata: ServiceRequestMetadata,
session: pyuavcan.transport.OutputSession,
monotonic_deadline: float) -> bool:
timestamp = pyuavcan.transport.Timestamp.now()
fragmented_payload = list(pyuavcan.dsdl.serialize(response))
transfer = pyuavcan.transport.Transfer(timestamp=timestamp,
priority=metadata.priority,
transfer_id=metadata.transfer_id,
fragmented_payload=fragmented_payload)
return await session.send_until(transfer, monotonic_deadline)