Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_wrong_header_version(self):
mdf = BytesIO(
b"MDF AAAA amdf500d\x00\x00\x00\x00\x9f\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
with self.assertRaises(MdfException):
MDF(mdf)
display_name="",
attachment=(),
source=None,
bit_count=None,
stream_sync=False,
invalidation_bits=None,
encoding=None,
):
if samples is None or timestamps is None or not name:
message = (
'"samples", "timestamps" and "name" are mandatory '
"for Signal class __init__: samples={samples}\n"
"timestamps={timestamps}\nname={name}"
)
raise MdfException(message)
else:
if not isinstance(samples, np.ndarray):
samples = np.array(samples)
if not isinstance(timestamps, np.ndarray):
timestamps = np.array(timestamps, dtype=np.float64)
if samples.shape[0] != timestamps.shape[0]:
message = "{} samples and timestamps length mismatch ({} vs {})"
message = message.format(name, samples.shape[0], timestamps.shape[0])
logger.exception(message)
raise MdfException(message)
self.samples = samples
self.timestamps = timestamps
self.unit = unit
self.name = name
self.comment = comment
self._plot_axis = None
self.abs_time,
self.tz_offset,
self.time_quality,
self.timer_identification,
) = unpack(
v23c.HEADER_POST_320_EXTRA_FMT,
stream.read(v23c.HEADER_POST_320_EXTRA_SIZE),
)
if self.id != b"HD":
message = (
f'Expected "HD" block @{hex(self.address)} but found "{self.id}"'
)
message = message.format(hex(64), self.id)
logger.exception(message)
raise MdfException(message)
if self.program_addr:
self.program = ProgramBlock(address=self.program_addr, stream=stream)
if self.comment_addr:
self.comment = get_text_v3(address=self.comment_addr, stream=stream)
except KeyError:
version = kwargs.get("version", "3.20")
self.id = b"HD"
self.block_len = 208 if version >= "3.20" else 164
self.first_dg_addr = 0
self.comment_addr = 0
self.program_addr = 0
self.dg_nr = 0
self.author_field = "{:0<32}".format(getuser()).encode("latin-1")
self.department_field = "{:0<32}".format("").encode("latin-1")
0-based channel index
Returns
-------
group_index, channel_index : (int, int)
selected channel's group and channel index
"""
suppress = True
if name is None:
if group is None or index is None:
message = (
"Invalid arguments for channel selection: "
'must give "name" or, "group" and "index"'
)
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if ch_nr >= 0:
if gp_nr > len(self.groups) - 1:
raise MdfException("Group index out of range")
if index > len(self.groups[gp_nr].channels) - 1:
raise MdfException("Channel index out of range")
else:
if name not in self.channels_db:
raise MdfException(f'Channel "{name}" not found')
else:
if source is not None:
for gp_nr, ch_nr in self.channels_db[name]:
source_name = self._get_source_name(gp_nr, ch_nr)
if source_name == source:
break
if index is not None and index < 0:
gp_nr = group
ch_nr = index
else:
for gp_nr, ch_nr in self.channels_db[name]:
if gp_nr == group:
if index is None:
break
elif index == ch_nr:
break
else:
if index is None:
message = f'Channel "{name}" not found in group {group}'
else:
message = f'Channel "{name}" not found in group {group} at index {index}'
raise MdfException(message)
return gp_nr, ch_nr
if group is None or index is None:
message = (
"Invalid arguments for channel selection: "
'must give "name" or, "group" and "index"'
)
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if ch_nr >= 0:
if gp_nr > len(self.groups) - 1:
raise MdfException("Group index out of range")
if index > len(self.groups[gp_nr].channels) - 1:
raise MdfException("Channel index out of range")
else:
if name not in self.channels_db:
raise MdfException(f'Channel "{name}" not found')
else:
if source is not None:
for gp_nr, ch_nr in self.channels_db[name]:
source_name = self._get_source_name(gp_nr, ch_nr)
if source_name == source:
break
else:
raise MdfException(f"{name} with source {source} not found")
elif group is None:
gp_nr, ch_nr = self.channels_db[name][0]
if len(self.channels_db[name]) > 1 and not suppress:
message = (
f'Multiple occurances for channel "{name}". '
f"Using first occurance from data group {gp_nr}. "
'Provide both "group" and "index" arguments'
last_timestamps = [None for gp in mdf.groups]
groups_nr = len(mdf.groups)
cg_nr = -1
for i, group in enumerate(mdf.groups):
included_channels = mdf._included_channels(i)
if mdf_index == 0:
included_channel_names.append(
[group.channels[k].name for k in included_channels]
)
else:
names = [group.channels[k].name for k in included_channels]
if names != included_channel_names[i]:
if sorted(names) != sorted(included_channel_names[i]):
raise MdfException(f"internal structure of file {mdf_index} is different")
else:
logger.warning(
f'Different channel order in channel group {i} of file {mdf_index}.'
' Data can be corrupted if the there are channels with the same '
'name in this channel group'
)
included_channels = [
mdf._validate_channel_selection(
name=name_,
group=i,
)[1]
for name_ in included_channel_names[i]
]
if included_channels:
cg_nr += 1
def __init__(self, name=None, version="4.10", **kwargs):
if name:
if is_file_like(name):
file_stream = name
else:
name = Path(name)
if name.is_file():
file_stream = open(name, "rb")
else:
raise MdfException(f'File "{name}" does not exist')
file_stream.seek(0)
magic_header = file_stream.read(3)
if magic_header != b"MDF":
raise MdfException(f'"{name}" is not a valid ASAM MDF file')
file_stream.seek(8)
version = file_stream.read(4).decode("ascii").strip(" \0")
if not version:
file_stream.read(16)
version = unpack("
"""
suppress = True
if name is None:
if group is None or index is None:
message = (
"Invalid arguments for channel selection: "
'must give "name" or, "group" and "index"'
)
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if ch_nr >= 0:
if gp_nr > len(self.groups) - 1:
raise MdfException("Group index out of range")
if index > len(self.groups[gp_nr].channels) - 1:
raise MdfException("Channel index out of range")
else:
if name not in self.channels_db:
raise MdfException(f'Channel "{name}" not found')
else:
if source is not None:
for gp_nr, ch_nr in self.channels_db[name]:
source_name = self._get_source_name(gp_nr, ch_nr)
if source_name == source:
break
else:
raise MdfException(f"{name} with source {source} not found")
elif group is None:
gp_nr, ch_nr = self.channels_db[name][0]
if len(self.channels_db[name]) > 1 and not suppress:
message = (
comment="">
"""
if version is None:
version = self.version
else:
version = validate_version_argument(version)
# group channels by group index
gps = {}
for item in channels:
if isinstance(item, (list, tuple)):
if len(item) not in (2, 3):
raise MdfException(
"The items used for filtering must be strings, "
"or they must match the first 3 argumens of the get "
"method"
)
else:
group, index = self._validate_channel_selection(*item)
if group not in gps:
gps[group] = {index}
else:
gps[group].add(index)
else:
name = item
group, index = self._validate_channel_selection(name)
if group not in gps:
gps[group] = {index}
else: