Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def from_group(group, time_index=False, absolute_time=False, scaled_data=True):
"""
Converts a TDMS group object to a DataFrame. DataFrame columns are named using the channel names.
:param group: Group object to convert.
:param time_index: Whether to include a time index for the dataframe.
:param absolute_time: If time_index is true, whether the time index
values are absolute times or relative to the start time.
:param scaled_data: By default the scaled data will be used.
Set to False to use raw unscaled data.
:return: The TDMS object data.
:rtype: pandas.DataFrame
"""
channels_to_export = OrderedDict((ch.name, ch) for ch in group.channels())
return _channels_to_dataframe(channels_to_export, time_index, absolute_time, scaled_data)
except KeyError:
pass
for (path_string, obj) in tdms_reader.object_metadata.items():
properties = object_properties[path_string]
path = ObjectPath.from_string(path_string)
if path.is_root:
pass
elif path.is_group:
group_properties[path.group] = properties
else:
# Object is a channel
try:
channel_group_properties = object_properties[path.group_path()]
except KeyError:
channel_group_properties = OrderedDict()
channel = TdmsChannel(
path, obj.data_type, obj.scaler_data_types, obj.num_values,
properties, channel_group_properties, self._properties,
tdms_reader, self._raw_timestamps, self._memmap_dir)
if path.group in group_channels:
group_channels[path.group].append(channel)
else:
group_channels[path.group] = [channel]
# Create group objects containing channels and properties
for group_name, properties in group_properties.items():
try:
channels = group_channels[group_name]
except KeyError:
channels = []
group_path = ObjectPath(group_name)
def _channels_to_dataframe(channels_to_export, time_index=False, absolute_time=False, scaled_data=True):
import pandas as pd
dataframe_dict = OrderedDict()
for column_name, channel in channels_to_export.items():
index = channel.time_track(absolute_time) if time_index else None
if scaled_data:
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(channel.data), index=index)
elif channel.scaler_data_types:
# Channel has DAQmx raw data
for scale_id, raw_data in channel.raw_scaler_data.items():
scaler_column_name = column_name + "[{0:d}]".format(scale_id)
dataframe_dict[scaler_column_name] = pd.Series(data=raw_data, index=index)
else:
# Raw data for normal TDMS file
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(channel.raw_data), index=index)
return pd.DataFrame.from_dict(dataframe_dict)
Setting this to true will read timestamps as a custom TdmsTimestamp type.
:param memmap_dir: The directory to store memory mapped data files in,
or None to read data into memory. The data files are created
as temporary files and are deleted when the channel data is no
longer used. tempfile.gettempdir() can be used to get the default
temporary file directory.
:param read_metadata_only: If this parameter is enabled then only the
metadata of the TDMS file will read.
:param keep_open: Keeps the file open so data can be read if only metadata
is read initially.
"""
self._memmap_dir = memmap_dir
self._raw_timestamps = raw_timestamps
self._groups = OrderedDict()
self._properties = OrderedDict()
self._channel_data = {}
self.data_read = False
self._reader = TdmsReader(file)
try:
self._read_file(self._reader, read_metadata_only)
finally:
if not keep_open:
self._reader.close()
def objects(self):
""" (Deprecated) A dictionary of objects in the TDMS file, where the keys are the object paths.
"""
_deprecated("TdmsFile.objects", "Use TdmsFile.groups() to access all groups in the file, " +
"and group.channels() to access all channels in a group.")
objects = OrderedDict()
root_path = ObjectPath()
objects[str(root_path)] = RootObject(self._properties)
for group in self.groups():
objects[group.path] = group
for channel in group.channels():
objects[channel.path] = channel
return objects
def __init__(self, tdms_file, group, raw_data_chunk, channel_offsets):
self.name = group.name
self._channels = OrderedDict(
(channel.name, ChannelDataChunk(
channel,
raw_data_chunk.channel_data.get(channel.path, RawChannelDataChunk.empty()),
channel_offsets[channel.path]))
for channel in group.channels())
def _read_file(self, tdms_reader, read_metadata_only):
tdms_reader.read_metadata()
# Use object metadata to build group and channel objects
group_properties = OrderedDict()
group_channels = OrderedDict()
object_properties = {
path_string: self._convert_properties(obj.properties)
for path_string, obj in tdms_reader.object_metadata.items()}
try:
self._properties = object_properties['/']
except KeyError:
pass
for (path_string, obj) in tdms_reader.object_metadata.items():
properties = object_properties[path_string]
path = ObjectPath.from_string(path_string)
if path.is_root:
pass
elif path.is_group:
group_properties[path.group] = properties
else:
def _convert_properties(self, properties):
def convert_prop(val):
if isinstance(val, TdmsTimestamp) and not self._raw_timestamps:
# Convert timestamps to numpy datetime64 if raw timestamps are not requested
return val.as_datetime64()
return val
return OrderedDict((k, convert_prop(v)) for (k, v) in properties.items())