Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_channel_data(self, offset=0, length=None):
if offset < 0:
raise ValueError("offset must be non-negative")
if length is not None and length < 0:
raise ValueError("length must be non-negative")
with Timer(log, "Allocate space for channel"):
# Allocate space for data
if length is None:
num_values = len(self) - offset
else:
num_values = min(length, len(self) - offset)
num_values = max(0, num_values)
channel_data = get_data_receiver(self, num_values, self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data for channel"):
# Now actually read all the data
for chunk in self._reader.read_raw_data_for_channel(self.path, offset, length):
if chunk.data is not None:
channel_data.append_data(chunk.data)
if chunk.scaler_data is not None:
for scaler_id, scaler_data in chunk.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)
return channel_data
def _read_data(self, tdms_reader):
with Timer(log, "Allocate space"):
# Allocate space for data
for group in self.groups():
for channel in group.channels():
self._channel_data[channel.path] = get_data_receiver(
channel, len(channel), self._raw_timestamps, self._memmap_dir)
with Timer(log, "Read data"):
# Now actually read all the data
for chunk in tdms_reader.read_raw_data():
for (path, data) in chunk.channel_data.items():
channel_data = self._channel_data[path]
if data.data is not None:
channel_data.append_data(data.data)
elif data.scaler_data is not None:
for scaler_id, scaler_data in data.scaler_data.items():
channel_data.append_scaler_data(scaler_id, scaler_data)
for group in self.groups():
for channel in group.channels():
channel_data = self._channel_data[channel.path]