Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dict: {(start, end): cost value, ...}
"""
n_regimes = params.n_regimes
start, end = params.start, params.end
jump, min_size = params.jump, params.min_size
if n_regimes == 1:
return {(start, end): self.error(start, end)}
elif n_regimes > 1:
# Let's fill the list of admissible last breakpoints
multiple_of_jump = (k for k in range(start, end) if k % jump == 0)
admissible_bkps = list()
for bkp in multiple_of_jump:
n_samples = bkp - start
# first check if left subproblem is possible
if sanity_check(n_samples, n_regimes - 1, jump, min_size):
# second check if the right subproblem has enough points
if end - bkp >= min_size:
admissible_bkps.append(bkp)
assert len(
admissible_bkps) > 0, "No admissible last breakpoints found.\
Parameters: {}".format(params)
# Compute the subproblems
sub_problems = list()
for bkp in admissible_bkps:
left_params = Params(n_regimes - 1, start, bkp, jump, min_size)
right_params = Params(1, bkp, end, jump, min_size)
left_partition = self.cache(left_params)
right_partition = self.cache(right_params)
tmp_partition = dict(left_partition)
jump (int, optional): number of points between two potential
breakpoints.
min_size (int, optional): regimes shorter than min_size are
discarded.
Returns:
list: indexes of each breakpoints.
"""
if signal is None:
if self.signal is None:
assert False, "You must define a signal."
else:
self.signal = signal
n_samples = self.signal.shape[0]
assert sanity_check(
n_samples,
n_regimes,
jump,
min_size), "Change n_regimes, jump or min_size."
params = Params(n_regimes=int(n_regimes),
start=0, end=self.signal.shape[0],
jump=int(jump), min_size=int(min_size))
self._partition = self.cache(params)
# we return the end of each segment of the partition
bkps = sorted(e for (s, e) in self._partition)
return bkps
Returns:
dict: {(start, end): cost value, ...}
"""
jump, min_size = self.jump, self.min_size
if n_bkps == 0:
cost = self.cost.error(start, end)
return {(start, end): cost}
elif n_bkps > 0:
# Let's fill the list of admissible last breakpoints
multiple_of_jump = (k for k in range(start, end) if k % jump == 0)
admissible_bkps = list()
for bkp in multiple_of_jump:
n_samples = bkp - start
# first check if left subproblem is possible
if sanity_check(n_samples, n_bkps, jump, min_size):
# second check if the right subproblem has enough points
if end - bkp >= min_size:
admissible_bkps.append(bkp)
assert len(
admissible_bkps) > 0, "No admissible last breakpoints found.\
start, end: ({},{}), n_bkps: {}.".format(start, end, n_bkps)
# Compute the subproblems
sub_problems = list()
for bkp in admissible_bkps:
left_partition = self.seg(start, bkp, n_bkps - 1)
right_partition = self.seg(bkp, end, 0)
tmp_partition = dict(left_partition)
tmp_partition[(bkp, end)] = right_partition[(bkp, end)]
sub_problems.append(tmp_partition)