Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_raw_data(self, f):
"""Read raw data from a TDMS segment
:returns: A generator of RawDataChunk objects with raw channel data for
objects in this segment.
"""
if not self.toc_mask & toc_properties['kTocRawData']:
yield RawDataChunk.empty()
f.seek(self.data_position)
total_data_size = self.next_segment_offset - self.raw_data_offset
log.debug(
"Reading %d bytes of data at %d in %d chunks",
total_data_size, f.tell(), self.num_chunks)
data_objects = [o for o in self.ordered_objects if o.has_data]
for chunk in range(self.num_chunks):
yield self._read_data_chunk(f, data_objects, chunk)
range(data_pos, obj.data_type.size + data_pos))
log.debug("Byte columns for channel %d: %s", i, byte_columns)
# Select columns for this channel, so that number of values will
# be number of bytes per point * number of data points.
# Then use ravel to flatten the results into a vector.
object_data = combined_data[:, byte_columns].ravel()
if obj.data_type.nptype is not None:
# Set correct data type, so that the array length should be correct
object_data.dtype = (
obj.data_type.nptype.newbyteorder(self.endianness))
else:
object_data = obj.data_type.from_bytes(object_data, self.endianness)
channel_data[obj.path] = object_data
data_pos += obj.data_type.size
return RawDataChunk.channel_data(channel_data)
log.debug("Reading interleaved data point by point")
object_data = {}
points_added = {}
for obj in data_objects:
object_data[obj.path] = obj.new_segment_data()
points_added[obj.path] = 0
while any([points_added[o.path] < o.number_values
for o in data_objects]):
for obj in data_objects:
if points_added[obj.path] < obj.number_values:
object_data[obj.path][points_added[obj.path]] = (
obj.read_value(file))
points_added[obj.path] += 1
return RawDataChunk.channel_data(object_data)
def empty():
return RawDataChunk({})
def scaler_data(data):
channel_chunks = {
path: RawChannelDataChunk.scaler_data(d)
for (path, d) in data.items()
}
return RawDataChunk(channel_chunks)
byte_columns = tuple(
range(offset, offset + scaler_size))
# Select columns for this scaler, so that number of values
# will be number of bytes per point * number of data
# points. Then use ravel to flatten the results into a
# vector.
this_scaler_data = combined_data[:, byte_columns].ravel()
# Now set correct data type, so that the array length
# should be correct
this_scaler_data.dtype = (
scaler.data_type.nptype.newbyteorder(self.endianness))
if obj.daqmx_metadata.scaler_type == DIGITAL_LINE_SCALER:
this_scaler_data = np.bitwise_and(this_scaler_data, 1)
scaler_data[obj.path][scaler.scale_id] = this_scaler_data
return RawDataChunk.scaler_data(scaler_data)
def channel_data(data):
channel_chunks = {
path: RawChannelDataChunk.channel_data(d)
for (path, d) in data.items()
}
return RawDataChunk(channel_chunks)