Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unittest_slow_random(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo],
caplog: typing.Any) -> None:
_logger.info(f'Number of random samples: {_NUM_RANDOM_SAMPLES}. '
f'Set the environment variable PYUAVCAN_TEST_NUM_RANDOM_SAMPLES to override.')
# The random test intentionally generates a lot of faulty data, which generates a lot of log messages.
# We don't want them to clutter the test output, so we raise the logging level temporarily.
caplog.set_level(logging.WARNING, logger='pyuavcan.dsdl')
performance: typing.Dict[pydsdl.CompositeType, _TypeTestStatistics] = {}
for info in generated_packages:
for model in _util.expand_service_types(info.models, keep_services=True):
if not isinstance(model, pydsdl.ServiceType):
performance[model] = _test_type(model, _NUM_RANDOM_SAMPLES)
else:
dtype = pyuavcan.dsdl.get_class(model)
with pytest.raises(TypeError):
assert list(pyuavcan.dsdl.serialize(dtype()))
with pytest.raises(TypeError):
pyuavcan.dsdl.deserialize(dtype, [memoryview(b'')])
_logger.info('Tested types ordered by serialization speed, %d random samples per type', _NUM_RANDOM_SAMPLES)
_logger.info('Columns: random SR correctness ratio; '
'mean serialization time [us]; mean deserialization time [us]')
def _are_close(data_type: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
"""
If you ever decided to copy-paste this test function into a production application,
beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
but this is not what most real systems expect.
"""
if a is None or b is None: # These occur, for example, in unions
return (a is None) == (b is None)
elif isinstance(data_type, pydsdl.CompositeType):
if type(a) != type(b): # pragma: no cover
return False
for f in pyuavcan.dsdl.get_type(a).fields_except_padding: # pragma: no cover
if not _are_close(f.data_type,
pyuavcan.dsdl.get_attribute(a, f.name),
pyuavcan.dsdl.get_attribute(b, f.name)):
return False
return True # Empty objects of same type compare equal
elif isinstance(data_type, pydsdl.ArrayType):
return all(starmap(partial(_are_close, data_type.element_type), zip(a, b))) \
if len(a) == len(b) and a.dtype == b.dtype else False
elif isinstance(data_type, pydsdl.FloatType):
t = {
16: numpy.float16,
def are_close(model: pydsdl.SerializableType, a: typing.Any, b: typing.Any) -> bool:
"""
If you ever decided to copy-paste this test function into a production application,
beware that it evaluates (NaN == NaN) as True. This is what we want when testing,
but this is not what most real systems expect.
"""
if a is None or b is None: # These occur, for example, in unions
return (a is None) == (b is None)
elif isinstance(model, pydsdl.CompositeType):
if type(a) != type(b): # pragma: no cover
return False
for f in pyuavcan.dsdl.get_model(a).fields_except_padding: # pragma: no cover
if not are_close(f.data_type,
pyuavcan.dsdl.get_attribute(a, f.name),
pyuavcan.dsdl.get_attribute(b, f.name)):
return False
return True # Empty objects of same type compare equal
elif isinstance(model, pydsdl.ArrayType):
if len(a) != len(b) or a.dtype != b.dtype: # pragma: no cover
return False
if isinstance(model.element_type, pydsdl.PrimitiveType):
return bool(numpy.allclose(a, b, equal_nan=True)) # Speedup for large arrays like images or point clouds
else:
return all(itertools.starmap(functools.partial(are_close, model.element_type), zip(a, b)))
ts = time.process_time()
sr = pyuavcan.dsdl.serialize(o)
perf_sample[0] = time.process_time() - ts
ts = time.process_time()
d = pyuavcan.dsdl.try_deserialize(type(o), sr)
perf_sample[1] = time.process_time() - ts
assert d is not None
assert type(o) is type(d)
assert pyuavcan.dsdl.get_type(o) == pyuavcan.dsdl.get_type(d)
assert _are_close(pyuavcan.dsdl.get_type(o), o, d), f'{o} != {d}; sr: {bytes(sr).hex()}'
# Similar floats may produce drastically different string representations, so if there is at least one float inside,
# we skip the string representation equality check.
if pydsdl.FloatType.__name__ not in repr(pyuavcan.dsdl.get_type(d)):
assert str(o) == str(d)
assert repr(o) == repr(d)
return perf_sample
def _make_random_object(data_type: pydsdl.SerializableType) -> typing.Any:
if isinstance(data_type, pydsdl.BooleanType):
return random.random() >= 0.5
elif isinstance(data_type, pydsdl.IntegerType): # noinspection PyTypeChecker
return random.randint(int(data_type.inclusive_value_range.min),
int(data_type.inclusive_value_range.max))
elif isinstance(data_type, pydsdl.FloatType): # We want inf/nan as well, so we generate int and then reinterpret
int_value = random.randrange(0, 2 ** data_type.bit_length)
unpack_fmt, pack_fmt = {
16: ('e', 'H'),
32: ('f', 'I'),
64: ('d', 'Q'),
}[data_type.bit_length]
fmt_prefix = '<'
out, = struct.unpack(fmt_prefix + unpack_fmt, struct.pack(fmt_prefix + pack_fmt, int_value))
return out
elif isinstance(data_type, pydsdl.FixedLengthArrayType):
return [_make_random_object(data_type.element_type) for _ in range(data_type.capacity)]
elif isinstance(data_type, pydsdl.VariableLengthArrayType):
length = random.randint(0, data_type.capacity)
return [_make_random_object(data_type.element_type) for _ in range(length)]
def _unittest_slow_builtin_form_automatic(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
for info in generated_packages:
for model in _util.expand_service_types(info.models):
if max(model.bit_length_set) / 8 > 1024 * 1024:
_logger.info('Automatic test of %s skipped because the type is too large', model)
continue # Skip large objects because they take forever to convert and test
obj = _util.make_random_object(model)
bi = pyuavcan.dsdl.to_builtin(obj)
reconstructed = pyuavcan.dsdl.update_from_builtin(pyuavcan.dsdl.get_class(model)(), bi)
if str(obj) != str(reconstructed) or repr(obj) != repr(reconstructed): # pragma: no branch
if pydsdl.FloatType.__name__ not in repr(model): # pragma: no cover
_logger.info('Automatic comparison cannot be performed because the objects of type %s may '
'contain floats. Please implement proper DSDL object comparison methods and '
'update this test to use them.',
model)
_logger.info('Original random object: %r', obj)
_logger.info('Reconstructed object: %r', reconstructed)
_logger.info('Built-in representation: %r', bi)
else:
assert False, f'{obj} != {reconstructed}'
def make_random_object(model: pydsdl.SerializableType) -> typing.Any:
"""
Returns an object of the specified DSDL type populated with random data.
"""
def fifty_fifty() -> bool:
return random.random() >= 0.5
if isinstance(model, pydsdl.BooleanType):
return fifty_fifty()
elif isinstance(model, pydsdl.IntegerType): # noinspection PyTypeChecker
return random.randint(int(model.inclusive_value_range.min),
int(model.inclusive_value_range.max))
elif isinstance(model, pydsdl.FloatType): # We want inf/nan as well, so we generate int and then reinterpret
int_value = random.randrange(0, 2 ** model.bit_length)
unpack_fmt, pack_fmt = {
16: ('e', 'H'),
32: ('f', 'I'),
64: ('d', 'Q'),
}[model.bit_length]
fmt_prefix = '<'
out, = struct.unpack(fmt_prefix + unpack_fmt, struct.pack(fmt_prefix + pack_fmt, int_value))
return out
elif isinstance(model, pydsdl.FixedLengthArrayType):
et = model.element_type
if isinstance(et, pydsdl.UnsignedIntegerType) and et.bit_length == 8: # Special case for faster testing
out = numpy.random.randint(0, 256, size=model.capacity, dtype=numpy.uint8)
else:
out = [make_random_object(model.element_type) for _ in range(model.capacity)]
elif isinstance(data_type, pydsdl.CompositeType):
if type(a) != type(b): # pragma: no cover
return False
for f in pyuavcan.dsdl.get_type(a).fields_except_padding: # pragma: no cover
if not _are_close(f.data_type,
pyuavcan.dsdl.get_attribute(a, f.name),
pyuavcan.dsdl.get_attribute(b, f.name)):
return False
return True # Empty objects of same type compare equal
elif isinstance(data_type, pydsdl.ArrayType):
return all(starmap(partial(_are_close, data_type.element_type), zip(a, b))) \
if len(a) == len(b) and a.dtype == b.dtype else False
elif isinstance(data_type, pydsdl.FloatType):
t = {
16: numpy.float16,
32: numpy.float32,
64: numpy.float64,
}[data_type.bit_length]
return numpy.allclose(t(a), t(b), equal_nan=True)
else:
return numpy.allclose(a, b)
ts = time.process_time()
d = pyuavcan.dsdl.deserialize(type(obj), chunks) # GC must be disabled while we're in the timed context
des_sample = time.process_time() - ts
gc.enable()
assert d is not None
assert type(obj) is type(d)
assert pyuavcan.dsdl.get_model(obj) == pyuavcan.dsdl.get_model(d)
if not _util.are_close(pyuavcan.dsdl.get_model(obj), obj, d): # pragma: no cover
assert False, f'{obj} != {d}; sr: {bytes().join(chunks).hex()}' # Branched for performance reasons
# Similar floats may produce drastically different string representations, so if there is at least one float inside,
# we skip the string representation equality check.
if pydsdl.FloatType.__name__ not in repr(pyuavcan.dsdl.get_model(d)):
assert str(obj) == str(d)
assert repr(obj) == repr(d)
return ser_sample, des_sample
def _unittest_slow_constants(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
for info in generated_packages:
for model in _util.expand_service_types(info.models, keep_services=True):
dtype = pyuavcan.dsdl.get_class(model)
for c in model.constants:
if isinstance(c.data_type, pydsdl.PrimitiveType): # pragma: no branch
reference = c.value
generated = pyuavcan.dsdl.get_attribute(dtype, c.name)
assert isinstance(reference, pydsdl.Primitive)
assert reference.native_value == pytest.approx(generated), \
'The generated constant does not compare equal against the DSDL source'
if issubclass(dtype, pyuavcan.dsdl.FixedPortObject):
assert issubclass(dtype, pyuavcan.dsdl.CompositeObject) \
and issubclass(dtype, pyuavcan.dsdl.FixedPortObject)
assert pyuavcan.dsdl.get_fixed_port_id(dtype) == pyuavcan.dsdl.get_model(dtype).fixed_port_id