Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test2_ruptures1D():
n_regimes = 5
n_samples = 500
# Piecewise constant signal
signal, chg_pts = pw_constant(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.1)
func_to_minimize = gaussmean(signal) # - log likelihood
pen = 10
pe = Pelt(func_to_minimize, penalty=pen,
n=signal.shape[0], K=0, min_size=1)
pe.fit()
# Piecewise linear signal
signal, chg_pts = pw_linear(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.1)
func_to_minimize = linear_mse(signal) # mean squared error
for pen in np.linspace(0.1, 100, 20):
pe = Pelt(func_to_minimize, penalty=pen, n=signal.shape[0], K=0)
pe.fit()
def test1_ruptures1D():
n_regimes = 5
n_samples = 500
# Piecewise constant signal
signal, chg_pts = pw_constant(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.1)
func_to_minimize = gaussmean(signal) # - log likelihood
for pen in np.linspace(0.1, 100, 20):
pe = Pelt(func_to_minimize, penalty=pen, n=signal.shape[0], K=0)
pe.fit()
# Piecewise linear signal
signal, chg_pts = pw_linear(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.1)
func_to_minimize = linear_mse(signal) # mean squared error
for pen in np.linspace(0.1, 100, 20):
pe = Pelt(func_to_minimize, penalty=pen,
n=signal.shape[0], K=0, min_size=3)
pe.fit()
def test4_ruptures1D():
n_regimes = 5
n_samples = 500
# Piecewise constant signal
signal, chg_pts = pw_constant(n=n_samples, clusters=n_regimes,
min_size=50, noisy=True, snr=0.001)
func_to_minimize = gaussmean(signal) # - log likelihood
pen = 50
pe = Pelt(func_to_minimize, penalty=pen, n=signal.shape[0], K=0)
my_chg_pts = pe.fit()
assert np.array_equal(np.sort(chg_pts), np.sort(my_chg_pts))
def signal_bkps_5D():
signal, bkps = pw_constant(n_features=5)
return signal, bkps
def signal_bkps_1D_noisy():
signal, bkps = pw_constant(n_features=1, noise_std=1)
return signal, bkps
product([pw_constant],
range(20, 1000, 200),
range(1, 4),
[2, 5, 3],
[None, 1, 2]))
def test_constant(func, n_samples, n_features, n_bkps, noise_std):
signal, bkps = func(
n_samples=n_samples, n_features=n_features, n_bkps=n_bkps, noise_std=noise_std)
assert signal.shape == (n_samples, n_features)
assert len(bkps) == n_bkps + 1
assert bkps[-1] == n_samples
def signal_bkps():
n_samples = 300
n_regimes = 3
dim = 3
signal, bkps = pw_constant(n=n_samples,
clusters=n_regimes,
noisy=True,
dim=dim,
snr=.01)
return signal, bkps
def signal_bkps_5D_noisy():
signal, bkps = pw_constant(n_features=5, noise_std=1)
return signal, bkps
def signal_bkps():
signal, bkps = pw_constant()
return signal, bkps
def pw_linear(n_samples=200, n_features=1, n_bkps=3, noise_std=None):
"""
Return piecewise linear signal and the associated changepoints.
Args:
n_samples (int, optional): signal length
n_features (int, optional): number of covariates
n_bkps (int, optional): number of change points
noise_std (float, optional): noise std. If None, no noise is added
Returns:
tuple: signal of shape (n_samples, n_features+1), list of breakpoints
"""
covar = normal(size=(n_samples, n_features))
linear_coeff, bkps = pw_constant(n_samples=n_samples,
n_bkps=n_bkps,
n_features=n_features,
noise_std=None)
var = np.sum(linear_coeff * covar, axis=1)
if noise_std is not None:
var += normal(scale=noise_std, size=var.shape)
signal = np.c_[var, covar]
return signal, bkps