Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# initialization
bkps = [self.n_samples]
stop = False
error = self.cost.error(0, self.n_samples)
while not stop:
stop = True
_, bkp = max((v, k) for k, v in enumerate(self.score, start=1)
if not any(abs(k - b) < self.width // 2 for b in bkps[:-1]))
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
elif pen is not None:
new_error = sum(self.cost.error(start, end)
for start, end in pairwise(sorted([0, bkp] + bkps)))
gain = error - new_error
if gain > pen:
stop = False
error = sum(self.cost.error(start, end)
for start, end in pairwise([0] + bkps))
elif epsilon is not None:
if error > epsilon:
stop = False
error = sum(self.cost.error(start, end)
for start, end in pairwise([0] + bkps))
if not stop:
bkps.append(bkp)
bkps.sort()
return bkps
Args:
n_bkps (int): number of breakpoints to find before stopping.
penalty (float): penalty value (>0)
epsilon (float): reconstruction budget (>0)
Returns:
dict: partition dict {(start, end): cost value,...}
"""
leaves = list(self.leaves)
# bottom up fusion
stop = False
while not stop:
stop = True
leaves.sort(key=lambda n: n.start)
merged = (self.merge(left, right)
for left, right in pairwise(leaves))
# find segment to merge
try:
leaf = min(merged, key=lambda n: n.gain)
except ValueError: # if merged is empty (all nodes have been merged).
break
if n_bkps is not None:
if len(leaves) > n_bkps + 1:
stop = False
elif pen is not None:
if leaf.gain < pen:
stop = False
elif epsilon is not None:
if sum(leaf_tmp.val for leaf_tmp in leaves) < epsilon:
stop = False
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
elif pen is not None:
new_error = sum(self.cost.error(start, end)
for start, end in pairwise(sorted([0, bkp] + bkps)))
gain = error - new_error
if gain > pen:
stop = False
error = sum(self.cost.error(start, end)
for start, end in pairwise([0] + bkps))
elif epsilon is not None:
if error > epsilon:
stop = False
error = sum(self.cost.error(start, end)
for start, end in pairwise([0] + bkps))
if not stop:
bkps.append(bkp)
bkps.sort()
return bkps
correlation /= inds * inds[::-1]
bkp = np.argmax(correlation) + 1
# orthogonal projection (matrix form)
# adj = np.zeros(self.gram.shape) # adjacency matrix
# for start, end in pairwise(sorted([0, bkp] + bkps)):
# duree = end - start
# adj[start:end, start:end] = np.ones(duree, duree) / duree
# gram_new = self.gram + adj @ self.gram @ adj - adj @ self.gram - self.gram @ adj
# csum = gram_new.cumsum(axis=0).cumsum(axis=1)
# orthogonal projection (vectorized form)
gram_new = self.gram.copy()
# cross product
cross_g = np.zeros(self.gram.shape)
for start, end in pairwise(sorted([0, bkp] + bkps)):
val = self.gram[:, start:end].mean(axis=1).reshape(-1, 1)
cross_g[:, start:end] = val
gram_new -= cross_g + cross_g.T
# products of segment means
for p, q in product(pairwise(sorted([0, bkp] + bkps)), repeat=2):
start1, end1 = p
start2, end2 = q
gram_new[start1:end1, start2:end2] += self.gram[
start1:end1, start2:end2].mean()
csum = gram_new.cumsum(axis=0).cumsum(axis=1)
# stopping criterion
stop = True
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
# for start, end in pairwise(sorted([0, bkp] + bkps)):
# duree = end - start
# adj[start:end, start:end] = np.ones(duree, duree) / duree
# gram_new = self.gram + adj @ self.gram @ adj - adj @ self.gram - self.gram @ adj
# csum = gram_new.cumsum(axis=0).cumsum(axis=1)
# orthogonal projection (vectorized form)
gram_new = self.gram.copy()
# cross product
cross_g = np.zeros(self.gram.shape)
for start, end in pairwise(sorted([0, bkp] + bkps)):
val = self.gram[:, start:end].mean(axis=1).reshape(-1, 1)
cross_g[:, start:end] = val
gram_new -= cross_g + cross_g.T
# products of segment means
for p, q in product(pairwise(sorted([0, bkp] + bkps)), repeat=2):
start1, end1 = p
start2, end2 = q
gram_new[start1:end1, start2:end2] += self.gram[
start1:end1, start2:end2].mean()
csum = gram_new.cumsum(axis=0).cumsum(axis=1)
# stopping criterion
stop = True
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
elif pen is not None:
if residual - csum[-1, -1] > pen:
stop = False
elif epsilon is not None:
if csum[-1, -1] > epsilon:
def membership_mat(bkps):
"""Return membership matrix for the given segmentation."""
n_samples = bkps[-1]
m_mat = np.zeros((n_samples, n_samples))
for start, end in pairwise([0] + bkps):
m_mat[start:end, start:end] = 1
return m_mat
res_list = list()
for ind in inds: # greedy search
res_tmp = 0
y_left, y_right = residual[:ind], residual[ind:]
x_left, x_right = self.covariates[:ind], self.covariates[ind:]
for x, y in zip((x_left, x_right), (y_left, y_right)):
# linear fit
_, res_sub, _, _ = lstsq(x, y)
# error on sub-signal
res_tmp += res_sub
res_list.append(res_tmp)
# find best index
_, bkp_opt = min(zip(res_list, inds))
# orthogonal projection
proj = np.zeros(signal.shape)
for start, end in pairwise(sorted([0, bkp_opt] + bkps)):
y = signal[start:end]
x = self.covariates[start:end]
coef, _, _, _ = lstsq(x, y)
proj[start:end] = x.dot(coef).reshape(-1, 1)
residual = signal - proj
# stopping criterion
stop = True
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
elif pen is not None:
if res_norm - residual.var() * n_samples > pen:
stop = False
elif epsilon is not None:
if residual.var() * n_samples > epsilon:
bkps = [self.n_samples]
residual = self.signal
inds = np.arange(1, self.n_samples)
correction = 1 / inds + 1 / inds[::-1]
while not stop:
res_norm = norm(residual)
# greedy search
raw_corr = np.sum(residual.cumsum(axis=0)**2, axis=1)
correlation = raw_corr[:-1].flatten() * correction
bkp_opt, _ = max(
enumerate(correlation, start=1), key=lambda x: x[1])
# orthogonal projection
proj = np.zeros(self.signal.shape)
for (start, end) in pairwise(sorted([0, bkp_opt] + bkps)):
proj[start:end] = self.signal[start:end].mean(axis=0)
residual = self.signal - proj
# stopping criterion
stop = True
if n_bkps is not None:
if len(bkps) - 1 < n_bkps:
stop = False
elif pen is not None:
if res_norm - norm(residual) > pen:
stop = False
elif epsilon is not None:
if norm(residual) > epsilon:
stop = False
# update
if not stop: