How to use the pydsdl.CompositeType function in pydsdl

To help you get started, we’ve selected a few pydsdl examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github UAVCAN / pyuavcan / tests / dsdl / _random.py View on Github external
def _unittest_slow_random(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
                          caplog:             typing.Any) -> None:
    _logger.info(f'Number of random samples: {_NUM_RANDOM_SAMPLES}. '
                 f'Set the environment variable PYUAVCAN_TEST_NUM_RANDOM_SAMPLES to override.')

    # The random test intentionally generates a lot of faulty data, which generates a lot of log messages.
    # We don't want them to clutter the test output, so we raise the logging level temporarily.
    caplog.set_level(logging.WARNING, logger='pyuavcan.dsdl')

    performance: typing.Dict[pydsdl.CompositeType, _TypeTestStatistics] = {}

    for info in generated_packages:
        for model in _util.expand_service_types(info.models, keep_services=True):
            if not isinstance(model, pydsdl.ServiceType):
                performance[model] = _test_type(model, _NUM_RANDOM_SAMPLES)
            else:
                dtype = pyuavcan.dsdl.get_class(model)
                with pytest.raises(TypeError):
                    assert list(pyuavcan.dsdl.serialize(dtype()))
                with pytest.raises(TypeError):
                    pyuavcan.dsdl.deserialize(dtype, [memoryview(b'')])

    _logger.info('Tested types ordered by serialization speed, %d random samples per type', _NUM_RANDOM_SAMPLES)
    _logger.info('Columns: random SR correctness ratio; '
                 'mean serialization time [us]; mean deserialization time [us]')
github UAVCAN / pyuavcan / tests / dsdl_compiler.py View on Github external
def _are_close(data_type: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
    """
    If you ever decided to copy-paste this test function into a production application,
    beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
    but this is not what most real systems expect.
    """
    if a is None or b is None:  # These occur, for example, in unions
        return (a is None) == (b is None)

    elif isinstance(data_type, pydsdl.CompositeType):
        if type(a) != type(b):  # pragma: no cover
            return False
        for f in pyuavcan.dsdl.get_type(a).fields_except_padding:  # pragma: no cover
            if not _are_close(f.data_type,
                              pyuavcan.dsdl.get_attribute(a, f.name),
                              pyuavcan.dsdl.get_attribute(b, f.name)):
                return False
        return True                 # Empty objects of same type compare equal

    elif isinstance(data_type, pydsdl.ArrayType):
        return all(starmap(partial(_are_close, data_type.element_type), zip(a, b))) \
            if len(a) == len(b) and a.dtype == b.dtype else False

    elif isinstance(data_type, pydsdl.FloatType):
        t = {
            16: numpy.float16,
github UAVCAN / pyuavcan / tests / dsdl / _util.py View on Github external
def are_close(model: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
    """
    If you ever decided to copy-paste this test function into a production application,
    beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
    but this is not what most real systems expect.
    """
    if a is None or b is None:  # These occur, for example, in unions
        return (a is None) == (b is None)

    elif isinstance(model, pydsdl.CompositeType):
        if type(a) != type(b):  # pragma: no cover
            return False
        for f in pyuavcan.dsdl.get_model(a).fields_except_padding:  # pragma: no cover
            if not are_close(f.data_type,
                             pyuavcan.dsdl.get_attribute(a, f.name),
                             pyuavcan.dsdl.get_attribute(b, f.name)):
                return False
        return True                 # Empty objects of same type compare equal

    elif isinstance(model, pydsdl.ArrayType):
        if len(a) != len(b) or a.dtype != b.dtype:  # pragma: no cover
            return False
        if isinstance(model.element_type, pydsdl.PrimitiveType):
            return bool(numpy.allclose(a, b, equal_nan=True))  # Speedup for large arrays like images or point clouds
        else:
            return all(itertools.starmap(functools.partial(are_close, model.element_type), zip(a, b)))
github UAVCAN / pyuavcan / pyuavcan / dsdl / _builtin_form.py View on Github external
source = dict(source)   # Create copy to prevent mutation of the original

    if not isinstance(destination, CompositeObject):  # pragma: no cover
        raise TypeError(f'Bad destination: expected a CompositeObject, got {type(destination).__name__}')

    model = get_model(destination)
    _raise_if_service_type(model)

    for f in model.fields_except_padding:
        field_type = f.data_type
        try:
            value = source.pop(f.name)
        except LookupError:
            continue    # No value specified, keep original value

        if isinstance(field_type, pydsdl.CompositeType):
            field_obj = get_attribute(destination, f.name)
            if field_obj is None:                               # Oh, this is a union
                field_obj = get_class(field_type)()             # The variant was not selected, construct a default
                set_attribute(destination, f.name, field_obj)   # Switch the union to the new variant
            update_from_builtin(field_obj, value)

        elif isinstance(field_type, pydsdl.ArrayType):
            element_type = field_type.element_type
            if isinstance(element_type, pydsdl.PrimitiveType):
                set_attribute(destination, f.name, value)
            elif isinstance(element_type, pydsdl.CompositeType):
                dtype = get_class(element_type)
                set_attribute(destination, f.name, [update_from_builtin(dtype(), s) for s in value])
            else:
                assert False, f'Unexpected array element type: {element_type!r}'
github UAVCAN / pyuavcan / pyuavcan / dsdl / _composite_object.py View on Github external
def get_model(class_or_instance: typing.Union[typing.Type[CompositeObject], CompositeObject]) -> pydsdl.CompositeType:
    """
    Obtains a PyDSDL model of the supplied DSDL-generated class or its instance.
    This is the inverse of :func:`get_class`.
    """
    # noinspection PyProtectedMember
    out = class_or_instance._MODEL_
    assert isinstance(out, pydsdl.CompositeType)
    return out
github UAVCAN / pyuavcan / pyuavcan / dsdl / _composite_object.py View on Github external
import pydsdl

from . import _serialized_representation


_logger = logging.getLogger(__name__)


class CompositeObject(abc.ABC):  # Members are surrounded with underscores to avoid collisions with DSDL attributes.
    """
    This is the base class for all Python classes generated from DSDL definitions.
    It does not have any public members.
    """

    # Type definition as provided by PyDSDL.
    _MODEL_: pydsdl.CompositeType

    # Defined in generated classes.
    _MAX_SERIALIZED_REPRESENTATION_SIZE_BYTES_: int

    @abc.abstractmethod
    def _serialize_aligned_(self, _ser_: _serialized_representation.Serializer) -> None:
        """
        Auto-generated serialization method.
        Appends the serialized representation of its object to the supplied Serializer instance.
        The current bit offset of the Serializer instance MUST be byte-aligned.
        This is not a part of the API.
        """
        raise NotImplementedError

    @staticmethod
    @abc.abstractmethod
github UAVCAN / pyuavcan / pyuavcan / dsdl / _builtin_form.py View on Github external
def _to_builtin_impl(obj:   typing.Union[CompositeObject, numpy.ndarray, str, bool, int, float],
                     model: pydsdl.SerializableType) \
        -> typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Any], str, bool, int, float]:
    if isinstance(model, pydsdl.CompositeType):
        assert isinstance(obj, CompositeObject)
        return {
            f.name: _to_builtin_impl(get_attribute(obj, f.name), f.data_type)
            for f in model.fields_except_padding
            if get_attribute(obj, f.name) is not None  # The check is to hide inactive union variants.
        }

    elif isinstance(model, pydsdl.ArrayType):
        assert isinstance(obj, numpy.ndarray)
        if model.string_like:  # TODO: drop this special case when strings are natively supported in DSDL.
            try:
                return bytes(e for e in obj).decode()
            except UnicodeError:
                return list(map(int, obj))
        else:
            return [_to_builtin_impl(e, model.element_type) for e in obj]
github UAVCAN / pyuavcan / pyuavcan / dsdl / _compiler.py View on Github external
_TEMPLATE_DIRECTORY: pathlib.Path = pathlib.Path(__file__).absolute().parent / pathlib.Path('_templates')

_OUTPUT_FILE_PERMISSIONS = 0o444
"""
Read-only for all because the files are autogenerated and should not be edited manually.
"""


@dataclasses.dataclass(frozen=True)
class GeneratedPackageInfo:
    path: pathlib.Path
    """
    Path to the directory that contains the top-level ``__init__.py``.
    """

    models: typing.List[pydsdl.CompositeType]
    """
    List of PyDSDL objects describing the source DSDL definitions.
    This can be used for arbitrarily complex introspection and reflection.
    """

    name: str
    """
    The name of the generated package, which is the same as the name of the DSDL root namespace unless
    the name had to be stropped. See ``nunavut.lang.py.PYTHON_RESERVED_IDENTIFIERS``.
    """


def generate_package(root_namespace_directory:        _AnyPath,
                     lookup_directories:              typing.Optional[typing.List[_AnyPath]] = None,
                     output_directory:                typing.Optional[_AnyPath] = None,
                     allow_unregulated_fixed_port_id: bool = False) -> GeneratedPackageInfo: