Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exceptions(signal_bkps, kernel):
signal, bkps = signal_bkps
n_regimes = len(bkps)
method = "dynp"
a = KernelMSE(method, kernel)
with pytest.raises(NotEnoughPoints):
a.fit(signal=signal, n_regimes=n_regimes, min_size=1)
method = "pelt"
a = KernelMSE(method, kernel)
with pytest.raises(NotEnoughPoints):
a.fit(signal=signal, penalty=1, min_size=1)
@raises(NotEnoughPoints)
def test2_ruptures1D():
n_regimes = 5
n_samples = 500
# Piecewise constant signal
signal, chg_pts = pw_constant(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.1)
func_to_minimize = gaussmean(signal) # - log likelihood
pen = 10
pe = Pelt(func_to_minimize, penalty=pen,
n=signal.shape[0], K=0, min_size=1)
pe.fit()
# Piecewise linear signal
signal, chg_pts = pw_linear(n=n_samples, clusters=n_regimes,
def test_exceptions(signal_bkps, kernel):
signal, bkps = signal_bkps
n_regimes = len(bkps)
method = "dynp"
a = KernelMSE(method, kernel)
with pytest.raises(NotEnoughPoints):
a.fit(signal=signal, n_regimes=n_regimes, min_size=1)
method = "pelt"
a = KernelMSE(method, kernel)
with pytest.raises(NotEnoughPoints):
a.fit(signal=signal, penalty=1, min_size=1)
def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
sub = self.signal[start:end]
if self.signal.shape[1] > 1:
cov = np.cov(sub.T)
else:
cov = np.array([[sub.var()]])
_, val = slogdet(cov)
return val * (end - start)
def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
sub = self.signal[start:end]
med = np.median(sub, axis=0)
return abs(sub - med).sum()
Associated K (see Pelt): 0
Args:
start (int): first index of the segment (index included)
end (int): last index of the segment (index excluded)
Raises:
NotEnoughPoints: if there are not enough points to compute the cost
for the specified segment (here, at least 3 points).
Returns:
float: cost on the given segment.
"""
if end - start < 3: # we need at least 2 points
raise NotEnoughPoints
sig = self.signal[start:end]
# we regress over the time variable
tt = np.arange(start, end)
a = np.column_stack((tt, np.ones(tt.shape)))
assert a.shape[0] == sig.shape[0]
# doing the regression
res_lstsq = lstsq(a, sig)
assert res_lstsq[1].shape[0] == sig.shape[1]
# squared error
residuals = res_lstsq[1]
return residuals.sum()
"""Return squared error on the interval start:end
Detailled description
Args:
start (int): start index (inclusive)
end (int): end index (exclusive)
Returns:
float: error
Raises:
NotEnoughPoints: when not enough points
"""
if end - start < self.min_size:
raise NotEnoughPoints
if self.model in ["constantl2", "rbf"]:
sub_gram = self.gram[start:end, start:end]
cost = np.diagonal(sub_gram).sum()
cost -= sub_gram.sum() / (end - start)
elif self.model == "constantl1":
med = np.median(self.signal[start:end], axis=0)
cost = abs(self.signal[start:end] - med).sum()
return cost
def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
y, X = self.signal[start:end], self.covar[start:end]
_, residual, _, _ = lstsq(X, y, rcond=None)
return residual.sum()
def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
y, X = self.signal[start:end], self.covar[start:end]
_, residual, _, _ = lstsq(X, y, rcond=None)
return residual.sum()
def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
mean = np.reshape(np.mean(self.ranks[start:end], axis=0), (-1, 1))
return -(end - start) * mean.T @ self.inv_cov @ mean