Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=None):
"""Read raw data from a TDMS segment
:param f: Open TDMS file object
:param channel_path: Path of channel to read data for
:param chunk_offset: Index of chunk to begin reading from
:param num_chunks: Number of chunks to read, or None to read to the end
:returns: A generator of RawChannelDataChunk objects with raw channel data for
a single channel in this segment.
"""
if not self.toc_mask & toc_properties['kTocRawData']:
yield RawChannelDataChunk.empty()
f.seek(self.data_position)
data_objects = [o for o in self.ordered_objects if o.has_data]
chunk_size = self._get_chunk_size()
if chunk_offset > 0:
f.seek(chunk_size * chunk_offset, os.SEEK_CUR)
stop_chunk = self.num_chunks if num_chunks is None else num_chunks + chunk_offset
for chunk_index in range(chunk_offset, stop_chunk):
yield self._read_channel_data_chunk(f, data_objects, chunk_index, channel_path)
def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
""" Read data from a chunk for a single channel
"""
channel_data = RawChannelDataChunk.empty()
for obj in data_objects:
number_values = self._get_channel_number_values(obj, chunk_index)
if obj.path == channel_path:
channel_data = RawChannelDataChunk.channel_data(obj.read_values(file, number_values))
elif number_values == obj.number_values:
# Seek over data for other channel data
file.seek(obj.data_size, os.SEEK_CUR)
else:
# In last chunk with reduced chunk size
if obj.data_type.size is None:
# Type is unsized (eg. string), try reading number of values
obj.read_values(file, number_values)
else:
file.seek(obj.data_type.size * number_values, os.SEEK_CUR)
return channel_data
def __init__(self, tdms_file, group, raw_data_chunk, channel_offsets):
self.name = group.name
self._channels = OrderedDict(
(channel.name, ChannelDataChunk(
channel,
raw_data_chunk.channel_data.get(channel.path, RawChannelDataChunk.empty()),
channel_offsets[channel.path]))
for channel in group.channels())
def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
""" Read data from a chunk for a single channel
"""
# In the base case we can read data for all channels
# and then select only the requested channel.
# Derived classes can implement more optimised reading.
data_chunk = self._read_data_chunk(file, data_objects, chunk_index)
try:
return data_chunk.channel_data[channel_path]
except KeyError:
return RawChannelDataChunk.empty()