Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_demo(self):
print("MDF read tests")
ret = True
for enable in (True, False):
for mdf in Path(TestMDF.tempdir_demo.name).iterdir():
for inp in (mdf, BytesIO(mdf.read_bytes())):
with MDF(inp) as input_file:
if input_file.version == "2.00":
continue
print(mdf)
for name in set(input_file.channels_db) - {"time", "t"}:
if (
name.endswith("[0]")
or name.startswith("DI")
or "\\" in name
):
continue
signal = input_file.get(name)
try:
original_samples = CHANNELS_DEMO[name.split("\\")[0]]
except:
Path(TestMDF.tempdir.name) / "tmp1",
overwrite=True,
compression=compression,
)
outfile2 = MDF(input_file)
outfile2.configure(read_fragment_size=8000)
outfile2 = outfile2.cut(
start=105.1, stop=201, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp2",
overwrite=True,
compression=compression,
)
outfile3 = MDF(input_file)
outfile3.configure(read_fragment_size=8000)
outfile3 = outfile3.cut(
start=201.1, whence=whence, include_ends=False
).save(Path(TestMDF.tempdir.name) / "tmp3", overwrite=True)
outfile4 = MDF(input_file)
outfile4.configure(read_fragment_size=8000)
outfile4 = outfile4.cut(
start=7000, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp4",
overwrite=True,
compression=compression,
)
outfile = MDF.concatenate(
Path(TestMDF.tempdir.name) / "tmp4",
overwrite=True,
compression=compression,
)
outfile = MDF.concatenate(
[outfile0, outfile1, outfile2, outfile3, outfile4],
version=MDF(input_file).version,
sync=whence,
).save(
Path(TestMDF.tempdir.name) / "tmp_cut",
overwrite=True,
compression=compression,
)
with MDF(outfile) as mdf:
for i, group in enumerate(mdf.groups):
if i == 0:
v = np.ones(cycles, dtype=np.uint64)
for j in range(1, 20):
vals = mdf.get(group=i, index=j, samples_only=True)[
0
]
cond = np.array_equal(vals, v * (j - 1))
if not cond:
print(
i, j, vals, v * (j - 1), len(vals), len(v)
)
self.assertTrue(cond)
elif i == 1:
v = np.ones(cycles, dtype=np.int64)
start=201.1, whence=whence, include_ends=False
).save(Path(TestMDF.tempdir.name) / "tmp3", overwrite=True)
outfile4 = MDF(input_file)
outfile4.configure(read_fragment_size=8000)
outfile4 = outfile4.cut(
start=7000, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp4",
overwrite=True,
compression=compression,
)
outfile = MDF.concatenate(
[outfile0, outfile1, outfile2, outfile3, outfile4],
version=MDF(input_file).version,
sync=whence,
).save(
Path(TestMDF.tempdir.name) / "tmp_cut",
overwrite=True,
compression=compression,
)
with MDF(outfile) as mdf:
for i, group in enumerate(mdf.groups):
if i == 0:
v = np.ones(cycles, dtype=np.uint64)
for j in range(1, 20):
vals = mdf.get(group=i, index=j, samples_only=True)[
0
]
for input_file in Path(TestMDF.tempdir_general.name).iterdir():
for whence in (0, 1):
for compression in range(3):
print(input_file)
outfile0 = MDF(input_file)
outfile0.configure(read_fragment_size=8000)
outfile0 = outfile0.cut(
stop=-1, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp0",
overwrite=True,
compression=compression,
)
outfile1 = MDF(input_file)
outfile1.configure(read_fragment_size=8000)
outfile1 = outfile1.cut(
stop=105, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp1",
overwrite=True,
compression=compression,
)
outfile2 = MDF(input_file)
outfile2.configure(read_fragment_size=8000)
outfile2 = outfile2.cut(
start=105.1, stop=201, whence=whence, include_ends=False
).save(
Path(TestMDF.tempdir.name) / "tmp2",
overwrite=True,
sig_int = Signal(
np.random.randint(-2 ** 9, 2 ** 7, CHANNEL_LEN),
np.arange(CHANNEL_LEN),
name="Integer Channel",
unit="unit1",
)
sig_float = Signal(
np.random.random(CHANNEL_LEN),
np.arange(CHANNEL_LEN),
name="Float Channel",
unit="unit2",
)
with MDF(version="3.10") as mdf:
mdf.append([sig_int, sig_float], common_timebase=True)
outfile = mdf.save(Path(TestMDF23.tempdir.name) / "tmp", overwrite=True)
with MDF(outfile) as mdf:
ret_sig_int = mdf.get(sig_int.name)
ret_sig_float = mdf.get(sig_float.name)
self.assertTrue(np.array_equal(ret_sig_int.samples, sig_int.samples))
self.assertTrue(np.array_equal(ret_sig_float.samples, sig_float.samples))
def test_convert_demo(self):
print("MDF convert tests")
for out in SUPPORTED_VERSIONS:
for input_file in Path(TestMDF.tempdir_demo.name).iterdir():
if MDF(input_file).version == "2.00":
continue
print(input_file, out)
with MDF(input_file) as mdf:
outfile = mdf.convert(out).save(
Path(TestMDF.tempdir_demo.name) / "tmp", overwrite=True,
)
equal = True
with MDF(input_file) as mdf, MDF(outfile) as mdf2:
for name in set(mdf2.channels_db) - {"t", "time"}:
original = mdf.get(name)
converted = mdf2.get(name)
raw = mdf.get(name, raw=True)
if not np.array_equal(original.samples, converted.samples):
equal = False
print(
name,
*zip(raw.samples, original.samples, converted.samples),
outfile,
sep="\n",
)
1 / 0
if not np.array_equal(
original.timestamps, converted.timestamps
def test_attachment_blocks_wo_filename(self):
original_data = b"Testing attachemnt block\nTest line 1"
mdf = MDF()
mdf.attach(
original_data,
file_name=None,
comment=None,
compression=True,
mime=r"text/plain",
embedded=True,
)
outfile = mdf.save(
Path(TestMDF4.tempdir.name) / "attachment.mf4", overwrite=True,
)
with MDF(outfile) as attachment_mdf:
data, filename = attachment_mdf.extract_attachment(index=0)
self.assertEqual(data, original_data)
self.assertEqual(filename, Path("bin.bin"))
def test_cut_arrays(self):
print("MDF cut big array files")
for input_file in Path(TestMDF.tempdir_array.name).iterdir():
for whence in (0, 1):
print(input_file, whence)
outfile1 = MDF(input_file)
outfile1.configure(read_fragment_size=8000)
outfile1 = outfile1.cut(
stop=105.5, whence=whence, include_ends=False
).save(Path(TestMDF.tempdir.name) / "tmp1", overwrite=True)
outfile2 = MDF(input_file)
outfile2.configure(read_fragment_size=8000)
outfile2 = outfile2.cut(
start=105.5, stop=201.5, whence=whence, include_ends=False
).save(Path(TestMDF.tempdir.name) / "tmp2", overwrite=True)
outfile3 = MDF(input_file)
outfile3.configure(read_fragment_size=8000)
outfile3 = outfile3.cut(
start=201.5, whence=whence, include_ends=False
).save(Path(TestMDF.tempdir.name) / "tmp3", overwrite=True)
outfile = MDF.concatenate(
[outfile1, outfile2, outfile3], MDF(input_file).version
).save(Path(TestMDF.tempdir.name) / "tmp_cut", overwrite=True)
equal = True
def generate_test_files(version="4.10"):
cycles = 3000
channels_count = 2000
mdf = MDF(version=version)
if version <= "3.30":
filename = r"test.mdf"
else:
filename = r"test.mf4"
if os.path.exists(filename):
return filename
t = np.arange(cycles, dtype=np.float64)
cls = v4b.ChannelConversion if version >= "4.00" else v3b.ChannelConversion
# no conversion
sigs = []
for i in range(channels_count):