How to use the asammdf.blocks.v2_v3_constants function in asammdf

To help you get started, we’ve selected a few asammdf examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
def _load_data(self, group, record_offset=0, record_count=None, optimize_read=True):
        """ get group's data block bytes"""
        has_yielded = False
        offset = 0
        _count = record_count
        channel_group = group.channel_group

        if group.data_location == v23c.LOCATION_ORIGINAL_FILE:
            # go to the first data block of the current data group
            stream = self._file
        else:
            stream = self._tempfile

        record_offset *= channel_group.samples_byte_nr

        # go to the first data block of the current data group
        if group.sorted:
            samples_size = channel_group.samples_byte_nr
            if not samples_size:
                yield b"", 0, _count
                has_yielded = True
            else:
                if self._read_fragment_size:
                    split_size = self._read_fragment_size // samples_size
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
def _get_not_byte_aligned_data(self, data, group, ch_nr):

        big_endian_types = (
            v23c.DATA_TYPE_UNSIGNED_MOTOROLA,
            v23c.DATA_TYPE_FLOAT_MOTOROLA,
            v23c.DATA_TYPE_DOUBLE_MOTOROLA,
            v23c.DATA_TYPE_SIGNED_MOTOROLA,
        )

        record_size = group.channel_group.samples_byte_nr

        channel = group.channels[ch_nr]

        bit_offset = channel.start_offset % 8
        byte_offset = channel.start_offset // 8
        bit_count = channel.bit_count

        byte_count = bit_offset + bit_count
        if byte_count % 8:
            byte_count = (byte_count // 8) + 1
        else:
            byte_count //= 8
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
ch_cntr += 1

            # simple channels don't have channel dependencies
            gp_dep.append(None)

        # channel group
        kargs = {
            "cycles_nr": cycles_nr,
            "samples_byte_nr": offset // 8,
            "ch_nr": ch_cntr,
        }
        if self.version >= "3.30":
            kargs["block_len"] = v23c.CG_POST_330_BLOCK_SIZE
        else:
            kargs["block_len"] = v23c.CG_PRE_330_BLOCK_SIZE
        gp.channel_group = ChannelGroup(**kargs)
        gp.channel_group.comment = source_info

        # data group
        if self.version >= "3.20":
            block_len = v23c.DG_POST_320_BLOCK_SIZE
        else:
            block_len = v23c.DG_PRE_320_BLOCK_SIZE
        gp.data_group = DataGroup(block_len=block_len)

        # data block
        types = dtype(types)

        gp.types = types
        gp.parents = parents
        gp.sorted = True
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
name = new_ch.name

                # handle multiple occurance of same channel name
                name = group_channels.get_unique_name(name)

                if start_offset >= next_byte_aligned_position:
                    parent_start_offset = (start_offset // 8) * 8

                    # check if there are byte gaps in the record
                    gap = (parent_start_offset - next_byte_aligned_position) // 8
                    if gap:
                        types.append(("", f"V{gap}"))

                    # adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
                    size = bit_offset + bit_count
                    if data_type == v23c.DATA_TYPE_STRING:
                        next_byte_aligned_position = parent_start_offset + size
                        if next_byte_aligned_position <= record_size:
                            dtype_pair = (name, get_fmt_v3(data_type, size))
                            types.append(dtype_pair)
                            parents[original_index] = name, bit_offset

                        else:
                            next_byte_aligned_position = parent_start_offset

                    elif data_type == v23c.DATA_TYPE_BYTEARRAY:
                        next_byte_aligned_position = parent_start_offset + size
                        if next_byte_aligned_position <= record_size:
                            dtype_pair = (name, get_fmt_v3(data_type, size))
                            types.append(dtype_pair)
                            parents[original_index] = name, bit_offset
                        else:
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
record.setflags(write=False)

                    vals = record[parent]
                    data_type = channel.data_type
                    size = vals.dtype.itemsize
                    if data_type == v23c.DATA_TYPE_BYTEARRAY:
                        size *= vals.shape[1]

                    vals_dtype = vals.dtype.kind
                    if vals_dtype not in "ui" and (bit_offset or not bits == size * 8):
                        vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr)
                    else:
                        dtype_ = vals.dtype
                        kind_ = dtype_.kind

                        if data_type in v23c.INT_TYPES:
                            if kind_ == "f":
                                if bits != size * 8:
                                    vals = self._get_not_byte_aligned_data(
                                        data_bytes, grp, ch_nr
                                    )
                                else:
                                    dtype_fmt = get_fmt_v3(data_type, bits)
                                    channel_dtype = dtype(dtype_fmt.split(")")[-1])
                                    vals = vals.view(channel_dtype)
                            else:

                                if dtype_.byteorder == ">":
                                    if bit_offset or bits != size << 3:
                                        vals = self._get_not_byte_aligned_data(
                                            data_bytes, grp, ch_nr
                                        )
github danielhrisca / asammdf / benchmarks / bench.py View on Github external
name="Channel_{}".format(i),
            unit="unit_{}".format(i),
            conversion=None,
            comment="Unsigned int 16bit channel {}".format(i),
            raw=True,
        )
        sigs.append(sig)
    mdf.append(sigs, common_timebase=True)

    # linear
    sigs = []
    for i in range(channels_count):
        conversion = {
            "conversion_type": v4c.CONVERSION_TYPE_LIN
            if version >= "4.00"
            else v3c.CONVERSION_TYPE_LINEAR,
            "a": float(i),
            "b": -0.5,
        }
        sig = Signal(
            np.ones(cycles, dtype=np.int64),
            t,
            name="Channel_{}".format(i),
            unit="unit_{}".format(i),
            conversion=cls(**conversion),
            comment="Signed 16bit channel {} with linear conversion".format(i),
            raw=True,
        )
        sigs.append(sig)
    mdf.append(sigs, common_timebase=True)

    # algebraic
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
unit = conversion.unit
            else:
                unit = ""

            comment = channel.comment

            description = channel.description.decode("latin-1").strip(" \t\n\0")
            if comment:
                comment = f"{comment}\n{description}"
            else:
                comment = description

            source = channel.source

            if source:
                if source["type"] == v23c.SOURCE_ECU:
                    source = SignalSource(
                        source.name,
                        source.path,
                        source.comment,
                        0,  # source type other
                        0,  # bus type none
                    )
                else:
                    source = SignalSource(
                        source.name,
                        source.path,
                        source.comment,
                        2,  # source type bus
                        2,  # bus type CAN
                    )
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
# data block
        types = dtype(types)

        gp.types = types
        gp.parents = parents
        gp.sorted = True

        if df.shape[0]:
            samples = fromarrays(fields, dtype=types)
        else:
            samples = array([])

        block = samples.tostring()

        gp.data_location = v23c.LOCATION_TEMPORARY_FILE
        if cycles_nr:
            data_address = tell()
            gp.data_group.data_block_addr = data_address
            size = len(block)
            self._tempfile.write(block)

            gp.data_blocks.append(
                DataBlockInfo(
                    address=data_address,
                    block_type=0,
                    raw_size=size,
                    size=size,
                    param=0,
                )
            )
        else:
github danielhrisca / asammdf / asammdf / blocks / mdf_v3.py View on Github external
}

            if s_size < 8:
                s_size = 8

            channel = Channel(**kargs)
            channel.name = name
            channel.source = new_source

            unit = units.get(name, b"")
            if unit:
                if hasattr(unit, "encode"):
                    unit = unit.encode("latin-1")
                # conversion for time channel
                kargs = {
                    "conversion_type": v23c.CONVERSION_TYPE_NONE,
                    "unit": unit,
                    "min_phy_value": 0,
                    "max_phy_value": 0,
                }
                conversion = ChannelConversion(**kargs)
                conversion.unit = unit

            gp_channels.append(channel)

            offset += s_size

            self.channels_db.add(name, (dg_cntr, ch_cntr))

            # update the parents as well
            field_name = field_names.get_unique_name(name)
            parents[ch_cntr] = field_name, 0
github danielhrisca / asammdf / asammdf / mdf.py View on Github external
if hasattr(ch, key):
                            addr = getattr(ch, key)
                        else:
                            addr = 0
                        if addr and addr not in texts:
                            stream.seek(addr + 2)
                            size = UINT16_u(stream.read(2))[0] - 4
                            texts[addr + 4] = randomized_string(size)

                    texts[ch.address + 26] = randomized_string(32)
                    texts[ch.address + 58] = randomized_string(128)

                    source = ch.source_addr
                    if source:
                        source = ChannelExtension(address=source, stream=stream)
                        if source.type == v23c.SOURCE_ECU:
                            texts[source.address + 12] = randomized_string(80)
                            texts[source.address + 92] = randomized_string(32)
                        else:
                            texts[source.address + 14] = randomized_string(36)
                            texts[source.address + 50] = randomized_string(36)

                    conv = ch.conversion_addr
                    if conv:
                        texts[conv + 22] = randomized_string(20)

                        conv = ChannelConversion(address=conv, stream=stream)

                        if conv.conversion_type == v23c.CONVERSION_TYPE_FORMULA:
                            texts[conv + 36] = randomized_string(conv.block_len - 36)

                        if conv.referenced_blocks: