Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unittest_slow_random(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
caplog: typing.Any) -> None:
_logger.info(f'Number of random samples: {_NUM_RANDOM_SAMPLES}. '
f'Set the environment variable PYUAVCAN_TEST_NUM_RANDOM_SAMPLES to override.')
# The random test intentionally generates a lot of faulty data, which generates a lot of log messages.
# We don't want them to clutter the test output, so we raise the logging level temporarily.
caplog.set_level(logging.WARNING, logger='pyuavcan.dsdl')
performance: typing.Dict[pydsdl.CompositeType, _TypeTestStatistics] = {}
for info in generated_packages:
for model in _util.expand_service_types(info.models, keep_services=True):
if not isinstance(model, pydsdl.ServiceType):
performance[model] = _test_type(model, _NUM_RANDOM_SAMPLES)
else:
dtype = pyuavcan.dsdl.get_class(model)
with pytest.raises(TypeError):
assert list(pyuavcan.dsdl.serialize(dtype()))
with pytest.raises(TypeError):
pyuavcan.dsdl.deserialize(dtype, [memoryview(b'')])
_logger.info('Tested types ordered by serialization speed, %d random samples per type', _NUM_RANDOM_SAMPLES)
_logger.info('Columns: random SR correctness ratio; '
'mean serialization time [us]; mean deserialization time [us]')
def _are_close(data_type: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
"""
If you ever decided to copy-paste this test function into a production application,
beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
but this is not what most real systems expect.
"""
if a is None or b is None: # These occur, for example, in unions
return (a is None) == (b is None)
elif isinstance(data_type, pydsdl.CompositeType):
if type(a) != type(b): # pragma: no cover
return False
for f in pyuavcan.dsdl.get_type(a).fields_except_padding: # pragma: no cover
if not _are_close(f.data_type,
pyuavcan.dsdl.get_attribute(a, f.name),
pyuavcan.dsdl.get_attribute(b, f.name)):
return False
return True # Empty objects of same type compare equal
elif isinstance(data_type, pydsdl.ArrayType):
return all(starmap(partial(_are_close, data_type.element_type), zip(a, b))) \
if len(a) == len(b) and a.dtype == b.dtype else False
elif isinstance(data_type, pydsdl.FloatType):
t = {
16: numpy.float16,
def are_close(model: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
"""
If you ever decided to copy-paste this test function into a production application,
beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
but this is not what most real systems expect.
"""
if a is None or b is None: # These occur, for example, in unions
return (a is None) == (b is None)
elif isinstance(model, pydsdl.CompositeType):
if type(a) != type(b): # pragma: no cover
return False
for f in pyuavcan.dsdl.get_model(a).fields_except_padding: # pragma: no cover
if not are_close(f.data_type,
pyuavcan.dsdl.get_attribute(a, f.name),
pyuavcan.dsdl.get_attribute(b, f.name)):
return False
return True # Empty objects of same type compare equal
elif isinstance(model, pydsdl.ArrayType):
if len(a) != len(b) or a.dtype != b.dtype: # pragma: no cover
return False
if isinstance(model.element_type, pydsdl.PrimitiveType):
return bool(numpy.allclose(a, b, equal_nan=True)) # Speedup for large arrays like images or point clouds
else:
return all(itertools.starmap(functools.partial(are_close, model.element_type), zip(a, b)))
source = dict(source) # Create copy to prevent mutation of the original
if not isinstance(destination, CompositeObject): # pragma: no cover
raise TypeError(f'Bad destination: expected a CompositeObject, got {type(destination).__name__}')
model = get_model(destination)
_raise_if_service_type(model)
for f in model.fields_except_padding:
field_type = f.data_type
try:
value = source.pop(f.name)
except LookupError:
continue # No value specified, keep original value
if isinstance(field_type, pydsdl.CompositeType):
field_obj = get_attribute(destination, f.name)
if field_obj is None: # Oh, this is a union
field_obj = get_class(field_type)() # The variant was not selected, construct a default
set_attribute(destination, f.name, field_obj) # Switch the union to the new variant
update_from_builtin(field_obj, value)
elif isinstance(field_type, pydsdl.ArrayType):
element_type = field_type.element_type
if isinstance(element_type, pydsdl.PrimitiveType):
set_attribute(destination, f.name, value)
elif isinstance(element_type, pydsdl.CompositeType):
dtype = get_class(element_type)
set_attribute(destination, f.name, [update_from_builtin(dtype(), s) for s in value])
else:
assert False, f'Unexpected array element type: {element_type!r}'
def get_model(class_or_instance: typing.Union[typing.Type[CompositeObject], CompositeObject]) -> pydsdl.CompositeType:
"""
Obtains a PyDSDL model of the supplied DSDL-generated class or its instance.
This is the inverse of :func:`get_class`.
"""
# noinspection PyProtectedMember
out = class_or_instance._MODEL_
assert isinstance(out, pydsdl.CompositeType)
return out
import pydsdl
from . import _serialized_representation
_logger = logging.getLogger(__name__)
class CompositeObject(abc.ABC): # Members are surrounded with underscores to avoid collisions with DSDL attributes.
"""
This is the base class for all Python classes generated from DSDL definitions.
It does not have any public members.
"""
# Type definition as provided by PyDSDL.
_MODEL_: pydsdl.CompositeType
# Defined in generated classes.
_MAX_SERIALIZED_REPRESENTATION_SIZE_BYTES_: int
@abc.abstractmethod
def _serialize_aligned_(self, _ser_: _serialized_representation.Serializer) -> None:
"""
Auto-generated serialization method.
Appends the serialized representation of its object to the supplied Serializer instance.
The current bit offset of the Serializer instance MUST be byte-aligned.
This is not a part of the API.
"""
raise NotImplementedError
@staticmethod
@abc.abstractmethod
def _to_builtin_impl(obj: typing.Union[CompositeObject, numpy.ndarray, str, bool, int, float],
model: pydsdl.SerializableType) \
-> typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Any], str, bool, int, float]:
if isinstance(model, pydsdl.CompositeType):
assert isinstance(obj, CompositeObject)
return {
f.name: _to_builtin_impl(get_attribute(obj, f.name), f.data_type)
for f in model.fields_except_padding
if get_attribute(obj, f.name) is not None # The check is to hide inactive union variants.
}
elif isinstance(model, pydsdl.ArrayType):
assert isinstance(obj, numpy.ndarray)
if model.string_like: # TODO: drop this special case when strings are natively supported in DSDL.
try:
return bytes(e for e in obj).decode()
except UnicodeError:
return list(map(int, obj))
else:
return [_to_builtin_impl(e, model.element_type) for e in obj]
_TEMPLATE_DIRECTORY: pathlib.Path = pathlib.Path(__file__).absolute().parent / pathlib.Path('_templates')
_OUTPUT_FILE_PERMISSIONS = 0o444
"""
Read-only for all because the files are autogenerated and should not be edited manually.
"""
@dataclasses.dataclass(frozen=True)
class GeneratedPackageInfo:
path: pathlib.Path
"""
Path to the directory that contains the top-level ``__init__.py``.
"""
models: typing.List[pydsdl.CompositeType]
"""
List of PyDSDL objects describing the source DSDL definitions.
This can be used for arbitrarily complex introspection and reflection.
"""
name: str
"""
The name of the generated package, which is the same as the name of the DSDL root namespace unless
the name had to be stropped. See ``nunavut.lang.py.PYTHON_RESERVED_IDENTIFIERS``.
"""
def generate_package(root_namespace_directory: _AnyPath,
lookup_directories: typing.Optional[typing.List[_AnyPath]] = None,
output_directory: typing.Optional[_AnyPath] = None,
allow_unregulated_fixed_port_id: bool = False) -> GeneratedPackageInfo: