Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def hamming(bkps1, bkps2):
"""Modified Hamming distance for partitions.
For all pair of points (x, y), x != y, the functions computes the
number of times the two partitions disagree.
The result is scaled to be within 0 and 1.
Args:
bkps1 (list): list of the last index of each regime.
bkps2 (list): list of the last index of each regime.
Returns:
float: Hamming distance.
"""
sanity_check(bkps1, bkps2)
n_samples = max(bkps1)
disagreement = abs(membership_mat(bkps1) - membership_mat(bkps2))
disagreement = triu(disagreement, k=1).sum() * 1.
disagreement /= n_samples * n_samples / 2 # scaling
return disagreement
def precision_recall(true_bkps, my_bkps, margin=10):
"""Calculate the precision/recall of an estimated segmentation compared with the true segmentation.
Args:
true_bkps (list): list of the last index of each regime (true
partition).
my_bkps (list): list of the last index of each regime (computed
partition).
margin (int, optional): allowed error (in points).
Returns:
tuple: (precision, recall)
"""
sanity_check(true_bkps, my_bkps)
assert margin > 0, "Margin of error must be positive (margin = {})".format(
margin)
if len(my_bkps) == 1:
return 0, 0
used = set()
true_pos = set(true_b
for true_b, my_b in product(true_bkps[:-1], my_bkps[:-1])
if my_b - margin < true_b < my_b + margin and
not (my_b in used or used.add(my_b)))
tp_ = len(true_pos)
precision = tp_ / (len(my_bkps) - 1)
recall = tp_ / (len(true_bkps) - 1)
return precision, recall
def hausdorff(bkps1, bkps2):
"""Compute the Hausdorff distance between changepoints.
Args:
bkps1 (list): list of the last index of each regime.
bkps2 (list): list of the last index of each regime.
Returns:
float: Hausdorff distance.
"""
sanity_check(bkps1, bkps2)
bkps1_arr = np.array(bkps1[:-1]).reshape(-1, 1)
bkps2_arr = np.array(bkps2[:-1]).reshape(-1, 1)
pw_dist = cdist(bkps1_arr, bkps2_arr)
res = max(pw_dist.min(axis=0).max(), pw_dist.min(axis=1).max())
return res
def meantime(true_bkps, my_bkps):
"""For each computed changepoint, the mean time error is the average number
of points to the closest true changepoint.
Not a symetric funtion.
Args:
true_bkps (list): list of the last index of each regime (true
partition).
my_bkps (list): list of the last index of each regime (computed
partition)
Returns:
float: mean time error.
"""
sanity_check(true_bkps, my_bkps)
true_bkps_arr = np.array(true_bkps[:-1]).reshape(-1, 1)
my_bkps_arr = np.array(my_bkps[:-1]).reshape(-1, 1)
pw_dist = cdist(true_bkps_arr, my_bkps_arr)
dist_from_true = pw_dist.min(axis=0)
assert len(dist_from_true) == len(my_bkps) - 1
return dist_from_true.mean()
def zero_one_loss(bkps1, bkps2):
"""Zero-one loss: 1 if bkps have the same number of breakpoints, 0 if not.
Args:
bkps1 (list): list of the last index of each regime.
bkps2 (list): list of the last index of each regime.
Returns:
int: 0 or 1
"""
sanity_check(bkps1, bkps2)
return int(len(bkps1) != len(bkps2))