Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_eda_clean():
sampling_rate = 1000
eda = nk.eda_simulate(
duration=30, sampling_rate=sampling_rate, scr_number=6, noise=0.01, drift=0.01, random_state=42
)
clean = nk.eda_clean(eda, sampling_rate=sampling_rate)
assert len(clean) == len(eda)
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/eda.py)
eda_biosppy = nk.eda_clean(eda, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=eda, ftype="butter", band="lowpass", order=4, frequency=5, sampling_rate=sampling_rate
)
original, _ = biosppy.tools.smoother(signal=original, kernel="boxzen", size=int(0.75 * sampling_rate), mirror=True)
# pd.DataFrame({"our":eda_biosppy, "biosppy":original}).plot()
assert np.allclose((eda_biosppy - original).mean(), 0, atol=1e-5)
# Check if filter was applied.
fft_raw = np.abs(np.fft.rfft(rsp))
fft_khodadad2018 = np.abs(np.fft.rfft(khodadad2018))
fft_biosppy = np.abs(np.fft.rfft(rsp_biosppy))
freqs = np.fft.rfftfreq(len(rsp), 1 / sampling_rate)
assert np.sum(fft_raw[freqs > 3]) > np.sum(fft_khodadad2018[freqs > 3])
assert np.sum(fft_raw[freqs < 0.05]) > np.sum(fft_khodadad2018[freqs < 0.05])
assert np.sum(fft_raw[freqs > 0.35]) > np.sum(fft_biosppy[freqs > 0.35])
assert np.sum(fft_raw[freqs < 0.1]) > np.sum(fft_biosppy[freqs < 0.1])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/resp.py#L62)
rsp_biosppy = nk.rsp_clean(rsp, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=rsp, ftype="butter", band="bandpass", order=2, frequency=[0.1, 0.35], sampling_rate=sampling_rate
)
original = nk.signal_detrend(original, order=0)
assert np.allclose((rsp_biosppy - original).mean(), 0, atol=1e-6)
ecg = nk.ecg_simulate(sampling_rate=sampling_rate, noise=noise)
ecg_cleaned_nk = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
assert ecg.size == ecg_cleaned_nk.size
# Assert that highpass filter with .5 Hz lowcut was applied.
fft_raw = np.abs(np.fft.rfft(ecg))
fft_nk = np.abs(np.fft.rfft(ecg_cleaned_nk))
freqs = np.fft.rfftfreq(ecg.size, 1 / sampling_rate)
assert np.sum(fft_raw[freqs < 0.5]) > np.sum(fft_nk[freqs < 0.5])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69)
ecg_biosppy = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=ecg,
ftype="FIR",
band="bandpass",
order=int(0.3 * sampling_rate),
frequency=[3, 45],
sampling_rate=sampling_rate,
)
assert np.allclose((ecg_biosppy - original).mean(), 0, atol=1e-6)
def test_emg_clean():
sampling_rate = 1000
emg = nk.emg_simulate(duration=20, sampling_rate=sampling_rate)
emg_cleaned = nk.emg_clean(emg, sampling_rate=sampling_rate)
assert emg.size == emg_cleaned.size
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/emg.py)
original, _, _ = biosppy.tools.filter_signal(
signal=emg, ftype="butter", band="highpass", order=4, frequency=100, sampling_rate=sampling_rate
)
emg_cleaned_biosppy = nk.signal_detrend(original, order=0)
assert np.allclose((emg_cleaned - emg_cleaned_biosppy).mean(), 0, atol=1e-6)
sampling_rate = 1000
eda = nk.eda_simulate(
duration=30, sampling_rate=sampling_rate, scr_number=6, noise=0.01, drift=0.01, random_state=42
)
clean = nk.eda_clean(eda, sampling_rate=sampling_rate)
assert len(clean) == len(eda)
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/eda.py)
eda_biosppy = nk.eda_clean(eda, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=eda, ftype="butter", band="lowpass", order=4, frequency=5, sampling_rate=sampling_rate
)
original, _ = biosppy.tools.smoother(signal=original, kernel="boxzen", size=int(0.75 * sampling_rate), mirror=True)
# pd.DataFrame({"our":eda_biosppy, "biosppy":original}).plot()
assert np.allclose((eda_biosppy - original).mean(), 0, atol=1e-5)
from astropy.stats import LombScargle
if len(sys.argv) != 3:
print("Bitte Datei mit dem zu analysierenden Signal angeben!")
else:
f = open(sys.argv[1], "r")
th = float(sys.argv[2])
dataEcg = []
for line in f:
lineValues = line.split()
dataEcg.append(float(lineValues[2]))
#Perform QRS detection
ecgOut = ecg.ecg(signal=dataEcg, sampling_rate=1000., show=False)
#Calculate RR Tachogram
rPeaks = ecgOut[2]
rrTachogram = []
prevPeak = rPeaks[0]
for peak in rPeaks[1:(len(rPeaks))]:
rrTachogram.append(peak - prevPeak)
prevPeak = peak
#Calculate median heartbeat template
templatesForCorrCoef = ecgOut[4]
cleanTemplates = templatesForCorrCoef
medianTemplate = [x / len(cleanTemplates) for x in [sum(x) for x in zip(*cleanTemplates)]]
#Calculate correlation coeffcients
corrCoeffs = []
import matplotlib.pyplot as plt
from scipy.stats.stats import pearsonr
from astropy.stats import LombScargle
if len(sys.argv) != 2:
print("Bitte Datei mit dem zu analysierenden Signal angeben!")
else:
f = open(sys.argv[1], "r")
dataEcg = []
for line in f:
lineValues = line.split()
dataEcg.append(float(lineValues[2]))
#Perform QRS detection
ecgOut = ecg.ecg(signal=dataEcg, sampling_rate=1000., show=False)
#Calculate RR Tachogram
rPeaks = ecgOut[2]
rrTachogram = []
prevPeak = rPeaks[0]
for peak in rPeaks[1:(len(rPeaks))]:
rrTachogram.append(peak - prevPeak)
prevPeak = peak
freq = np.linspace(0, 0.4, 1000)
def movingaverage (values, window):
weights = np.repeat(1.0, window)/window
sma = np.convolve(values, weights, 'valid')
return sma
import matplotlib.pyplot as plt
from scipy.stats.stats import pearsonr
from astropy.stats import LombScargle
#Getting the data
f = open("sampleDataEcgEda.txt", "r")
dataEcg = []
for line in f:
lineValues = line.split(",")
dataEcg.append(float(lineValues[1]))
#Perform QRS detection
ecgOut = ecg.ecg(signal=dataEcg, sampling_rate=1000., show=False)
#Calculate RR Tachogram
rPeaks = ecgOut[2]
rrTachogram = []
prevPeak = rPeaks[0]
for peak in rPeaks[1:(len(rPeaks))]:
rrTachogram.append(peak - prevPeak)
prevPeak = peak
#Calculate median heartbeat template
templatesForCorrCoef = ecgOut[4]
templates = templatesForCorrCoef
medianTemplate = [x / len(templates) for x in [sum(x) for x in zip(*templates)]]
#Calculate correlation coeffcients
import sys
#Getting the data
if len(sys.argv) != 3:
print("Bitte Datei mit dem zu analysierenden Signal sowie einen Grenzwert angeben!")
else:
f = open(sys.argv[1], "r")
th = float(sys.argv[2])
dataEcg = []
for line in f:
lineValues = line.split()
dataEcg.append(float(lineValues[2]))
#Perform QRS detection
ecgOut = ecg.ecg(signal=dataEcg, sampling_rate=1000., show=False)
#Calculate RR Tachogram
rPeaks = ecgOut[2]
rrTachogram = []
prevPeak = rPeaks[0]
for peak in rPeaks[1:(len(rPeaks))]:
rrTachogram.append(peak - prevPeak)
prevPeak = peak
#Calculate median heartbeat template
templatesForCorrCoef = ecgOut[4]
cleanTemplates = templatesForCorrCoef
medianTemplate = [x / len(cleanTemplates) for x in [sum(x) for x in zip(*cleanTemplates)]]
#Calculate correlation coeffcients
corrCoeffs = []
If True, sub-directories and sub-folders are also written to the
archive.
root : str, optional
Relative folder path.
Notes
-----
* Ignores non-existent files and directories.
"""
if root is None:
root = ''
for item in files:
fpath = utils.normpath(item)
if not os.path.exists(fpath):
continue
# relative archive name
arcname = os.path.join(root, os.path.split(fpath)[1])
# write
fid.write(fpath, arcname)
# recur
if recursive and os.path.isdir(fpath):
rfiles = [os.path.join(fpath, subitem)
for subitem in os.listdir(fpath)]
zip_write(fid, rfiles, recursive=recursive, root=arcname)