Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_data(self, tdms_reader):
with Timer(log, "Allocate space"):
# Allocate space for data
for group in self.groups():
for channel in group.channels():
self._channel_data[channel.path] = get_data_receiver(
channel, len(channel), self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data"):
# Now actually read all the data
for chunk in tdms_reader.read_raw_data():
for (path, data) in chunk.channel_data.items():
channel_data = self._channel_data[path]
if data.data is not None:
channel_data.append_data(data.data)
elif data.scaler_data is not None:
for scaler_id, scaler_data in data.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)
for group in self.groups():
for channel in group.channels():
channel_data = self._channel_data[channel.path]
if channel_data is not None:
channel._set_raw_data(channel_data)
def _read_channel_data(self, offset=0, length=None):
if offset < 0:
raise ValueError("offset must be non-negative")
if length is not None and length < 0:
raise ValueError("length must be non-negative")
with Timer(log, "Allocate space for channel"):
# Allocate space for data
if length is None:
num_values = len(self) - offset
else:
num_values = min(length, len(self) - offset)
num_values = max(0, num_values)
channel_data = get_data_receiver(self, num_values, self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data for channel"):
# Now actually read all the data
for chunk in self._reader.read_raw_data_for_channel(self.path, offset, length):
if chunk.data is not None:
channel_data.append_data(chunk.data)
if chunk.scaler_data is not None:
for scaler_id, scaler_data in chunk.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)
return channel_data
def read_metadata(self):
""" Read all metadata and structure information from a TdmsFile
"""
self._ensure_open()
if self._index_file_path is not None:
reading_index_file = True
file = open(self._index_file_path, 'rb')
else:
reading_index_file = False
file = self._file
self._segments = []
segment_position = 0
try:
with Timer(log, "Read metadata"):
# Read metadata first to work out how much space we need
previous_segment = None
while True:
start_position = file.tell()
try:
segment = self._read_segment_metadata(
file, segment_position, previous_segment, reading_index_file)
except EOFError:
# We've finished reading the file
break
self._update_object_metadata(segment)
self._update_object_properties(segment)
self._segments.append(segment)
previous_segment = segment
def _read_channel_data(self, offset=0, length=None):
if offset < 0:
raise ValueError("offset must be non-negative")
if length is not None and length < 0:
raise ValueError("length must be non-negative")
with Timer(log, "Allocate space for channel"):
# Allocate space for data
if length is None:
num_values = len(self) - offset
else:
num_values = min(length, len(self) - offset)
num_values = max(0, num_values)
channel_data = get_data_receiver(self, num_values, self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data for channel"):
# Now actually read all the data
for chunk in self._reader.read_raw_data_for_channel(self.path, offset, length):
if chunk.data is not None:
channel_data.append_data(chunk.data)
if chunk.scaler_data is not None:
for scaler_id, scaler_data in chunk.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)
def _read_data(self, tdms_reader):
with Timer(log, "Allocate space"):
# Allocate space for data
for group in self.groups():
for channel in group.channels():
self._channel_data[channel.path] = get_data_receiver(
channel, len(channel), self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data"):
# Now actually read all the data
for chunk in tdms_reader.read_raw_data():
for (path, data) in chunk.channel_data.items():
channel_data = self._channel_data[path]
if data.data is not None:
channel_data.append_data(data.data)
elif data.scaler_data is not None:
for scaler_id, scaler_data in data.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)