Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, f, endianness, scaler_type):
"""
Read the metadata for a DAQmx raw segment. This is the raw
DAQmx-specific portion of the raw data index.
"""
self.scaler_type = scaler_type
self.data_type = types.tds_data_types[0xFFFFFFFF]
self.dimension = types.Uint32.read(f, endianness)
# In TDMS format version 2.0, 1 is the only valid value for dimension
if self.dimension != 1:
raise ValueError("Data dimension is not 1")
self.chunk_size = types.Uint64.read(f, endianness)
# size of vector of format changing scalers
scaler_vector_length = types.Uint32.read(f, endianness)
self.scalers = [
DaqMxScaler(f, endianness, scaler_type)
for _ in range(scaler_vector_length)]
# Read raw data widths.
# This is an array of widths in bytes, which should be the same
# for all channels that have DAQmx data in a segment.
# There is one element per acquisition card, as data is interleaved
# separately for each card.
raw_data_widths_length = types.Uint32.read(f, endianness)
self.raw_data_widths = np.zeros(raw_data_widths_length, dtype=np.int32)
for width_idx in range(raw_data_widths_length):
self.raw_data_widths[width_idx] = types.Uint32.read(f, endianness)
toc_mask = types.Int32.read(file)
if log.isEnabledFor(logging.DEBUG):
for prop_name, prop_mask in toc_properties.items():
prop_is_set = (toc_mask & prop_mask) != 0
log.debug("Property %s is %s", prop_name, prop_is_set)
endianness = '>' if (toc_mask & toc_properties['kTocBigEndian']) else '<'
# Next four bytes are version number
version = types.Int32.read(file, endianness)
if version not in (4712, 4713):
log.warning("Unrecognised version number.")
# Now 8 bytes each for the offset values
next_segment_offset = types.Uint64.read(file, endianness)
raw_data_offset = types.Uint64.read(file, endianness)
# Calculate data and next segment position
lead_size = 7 * 4
data_position = segment_position + lead_size + raw_data_offset
if next_segment_offset == 0xFFFFFFFFFFFFFFFF:
# Segment size is unknown. This can happen if LabVIEW crashes.
# Try to read until the end of the file.
log.warning(
"Last segment of file has unknown size, "
"will attempt to read to the end of the file")
next_segment_pos = self._get_data_file_size()
next_segment_offset = next_segment_pos - segment_position - lead_size
else:
log.debug("Next segment offset = %d, raw data offset = %d",
next_segment_offset, raw_data_offset)
if log.isEnabledFor(logging.DEBUG):
for prop_name, prop_mask in toc_properties.items():
prop_is_set = (toc_mask & prop_mask) != 0
log.debug("Property %s is %s", prop_name, prop_is_set)
endianness = '>' if (toc_mask & toc_properties['kTocBigEndian']) else '<'
# Next four bytes are version number
version = types.Int32.read(file, endianness)
if version not in (4712, 4713):
log.warning("Unrecognised version number.")
# Now 8 bytes each for the offset values
next_segment_offset = types.Uint64.read(file, endianness)
raw_data_offset = types.Uint64.read(file, endianness)
# Calculate data and next segment position
lead_size = 7 * 4
data_position = segment_position + lead_size + raw_data_offset
if next_segment_offset == 0xFFFFFFFFFFFFFFFF:
# Segment size is unknown. This can happen if LabVIEW crashes.
# Try to read until the end of the file.
log.warning(
"Last segment of file has unknown size, "
"will attempt to read to the end of the file")
next_segment_pos = self._get_data_file_size()
next_segment_offset = next_segment_pos - segment_position - lead_size
else:
log.debug("Next segment offset = %d, raw data offset = %d",
next_segment_offset, raw_data_offset)
log.debug("Data size = %d b",
self.data_type != types.String):
raise ValueError(
"Unsupported data type: %r" % self.data_type)
# Read data dimension
dimension = types.Uint32.read(f, self.endianness)
# In TDMS version 2.0, 1 is the only valid value for dimension
if dimension != 1:
raise ValueError("Data dimension is not 1")
# Read number of values
self.number_values = types.Uint64.read(f, self.endianness)
# Variable length data types have total size
if self.data_type in (types.String,):
self.data_size = types.Uint64.read(f, self.endianness)
else:
self.data_size = self.number_values * self.data_type.size
log.debug(
"Object number of values in segment: %d", self.number_values)
raise KeyError("Unrecognised data type")
log.debug("Object data type: %s", self.data_type.__name__)
if (self.data_type.size is None and
self.data_type != types.String):
raise ValueError(
"Unsupported data type: %r" % self.data_type)
# Read data dimension
dimension = types.Uint32.read(f, self.endianness)
# In TDMS version 2.0, 1 is the only valid value for dimension
if dimension != 1:
raise ValueError("Data dimension is not 1")
# Read number of values
self.number_values = types.Uint64.read(f, self.endianness)
# Variable length data types have total size
if self.data_type in (types.String,):
self.data_size = types.Uint64.read(f, self.endianness)
else:
self.data_size = self.number_values * self.data_type.size
log.debug(
"Object number of values in segment: %d", self.number_values)