Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_all_subplots(self):
file_name, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "Select output measurement file", "", "MDF version 4 files (*.mf4)",
)
if file_name:
with MDF() as mdf:
for mdi in self.mdi_area.subWindowList():
plt = mdi.widget()
mdf.append(plt.plot.signals)
mdf.save(file_name, overwrite=True)
time_from_zero : bool
start time stamps from 0s in the cut measurement
Returns
-------
out : MDF
new MDF object
"""
if version is None:
version = self.version
else:
version = validate_version_argument(version)
out = MDF(version=version)
if whence == 1:
timestamps = []
for i, group in enumerate(self.groups):
fragment = next(self._load_data(group))
master = self.get_master(i, fragment)
if master.size:
timestamps.append(master[0])
del master
if timestamps:
first_timestamp = np.amin(timestamps)
else:
first_timestamp = 0
if start is not None:
sync the files based on the start of measurement, default *True*
Returns
-------
stacked : MDF
new *MDF* object with stacked channels
"""
if not files:
raise MdfException("No files given for stack")
version = validate_version_argument(version)
callback = kwargs.get("callback", None)
stacked = MDF(version=version, callback=callback)
files_nr = len(files)
if callback:
callback(0, files_nr)
if sync:
timestamps = []
for file in files:
if isinstance(file, MDF):
timestamps.append(file.header.start_time)
else:
with open(file, "rb") as mdf:
mdf.seek(64)
blk_id = mdf.read(2)
if blk_id == b"HD":
def run_thread_with_progress(
widget, target, kwargs, factor=100, offset=0, progress=None
):
termination_request = False
thr = WorkerThread(target=target, kwargs=kwargs)
thr.start()
while widget.progress is None:
sleep(0.1)
while thr.is_alive():
termination_request = progress.wasCanceled()
if termination_request:
MDF._terminate = True
MDF2._terminate = True
MDF3._terminate = True
MDF4._terminate = True
else:
if widget.progress is not None:
progress.setValue(
int(widget.progress[0] / widget.progress[1] * factor) + offset
)
sleep(0.1)
if termination_request:
MDF._terminate = False
MDF2._terminate = False
MDF3._terminate = False
MDF4._terminate = False
progress = setup_progress(
parent=self,
title="Converting measurements",
message=f'Converting "{count}" files to {version}',
icon_name="convert",
)
files = self._prepare_files(progress)
source_files = [Path(self.files_list.item(row).text()) for row in range(count)]
for i, (file, source_file) in enumerate(zip(files, source_files)):
progress.setLabelText(f"Converting file {i+1} of {count} to {version}")
if not isinstance(file, MDF):
# open file
target = MDF
kwargs = {
"name": file,
"callback": self.update_progress,
}
mdf = run_thread_with_progress(
self,
target=target,
kwargs=kwargs,
factor=0,
offset=int(i * delta),
progress=progress,
)
title="Converting measurements",
message=f'Converting "{count}" files to {version}',
icon_name="convert",
)
files = self._prepare_files(progress)
source_files = [Path(self.files_list.item(row).text()) for row in range(count)]
for i, (file, source_file) in enumerate(zip(files, source_files)):
progress.setLabelText(f"Converting file {i+1} of {count} to {version}")
if not isinstance(file, MDF):
# open file
target = MDF
kwargs = {
"name": file,
"callback": self.update_progress,
}
mdf = run_thread_with_progress(
self,
target=target,
kwargs=kwargs,
factor=0,
offset=int(i * delta),
progress=progress,
)
else:
mdf = file
encodings = []
included_channel_names = []
if add_samples_origin:
origin_conversion = {}
for i, mdf in enumerate(files):
origin_conversion[f'val_{i}'] = i
if isinstance(mdf, MDF):
origin_conversion[f'text_{i}'] = str(mdf.name)
else:
origin_conversion[f'text_{i}'] = str(mdf)
origin_conversion = from_dict(origin_conversion)
for mdf_index, (offset, mdf) in enumerate(zip(offsets, files)):
if not isinstance(mdf, MDF):
mdf = MDF(mdf)
if mdf_index == 0:
last_timestamps = [None for gp in mdf.groups]
groups_nr = len(mdf.groups)
cg_nr = -1
for i, group in enumerate(mdf.groups):
included_channels = mdf._included_channels(i)
if mdf_index == 0:
included_channel_names.append(
[group.channels[k].name for k in included_channels]
)
else:
names = [group.channels[k].name for k in included_channels]
if names != included_channel_names[i]:
versions.append("4.00")
blk_id += mdf.read(2)
if blk_id == b"##HD":
header = HeaderV4
else:
raise MdfException(f'"{file}" is not a valid MDF file')
header = header(address=64, stream=mdf)
oldest = header.start_time
offsets = [0 for _ in files]
version = validate_version_argument(version)
merged = MDF(version=version, callback=callback)
merged.header.start_time = oldest
encodings = []
included_channel_names = []
if add_samples_origin:
origin_conversion = {}
for i, mdf in enumerate(files):
origin_conversion[f'val_{i}'] = i
if isinstance(mdf, MDF):
origin_conversion[f'text_{i}'] = str(mdf.name)
else:
origin_conversion[f'text_{i}'] = str(mdf)
origin_conversion = from_dict(origin_conversion)
Parameters
----------
name : str | pathlib.Path
file name
Returns
-------
name : str
scrambled file name
"""
name = Path(name)
mdf = MDF(name)
texts = {}
callback = kwargs.get("callback", None)
if callback:
callback(0, 100)
count = len(mdf.groups)
if mdf.version >= "4.00":
ChannelConversion = ChannelConversionV4
stream = mdf._file
if mdf.header.comment_addr:
stream.seek(mdf.header.comment_addr + 8)
size = UINT64_u(stream.read(8))[0] - 24
Returns
-------
mdf : MDF
new *MDF* with resampled channels
"""
if version is None:
version = self.version
else:
version = validate_version_argument(version)
interpolation_mode = self._integer_interpolation
mdf = MDF(version=version)
mdf.header.start_time = self.header.start_time
groups_nr = len(self.groups)
if self._callback:
self._callback(0, groups_nr)
# walk through all groups and get all channels
cg_nr = -1
for i, group in enumerate(self.groups):
if version < "4.00":
encodings = []
included_channels = self._included_channels(i)
if included_channels:
cg_nr += 1