Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:returns list:
a list of distances to each subquantizer cluster for each subquantizer
"""
px = self.project(x, coarse_codes)
subquantizer_dists = []
if coarse_split is None:
split_iter = iterate_splits(px, self.num_coarse_splits)
else:
split_iter = [(np.split(px, self.num_coarse_splits)[coarse_split], coarse_split)]
# for cx, split in iterate_splits(px, self.num_coarse_splits):
for cx, split in split_iter:
_, _, _, subC = self.get_split_parameters(split)
subquantizer_dists += [((fx - subC[sub_split]) ** 2).sum(axis=1) for fx, sub_split in iterate_splits(cx, self.num_fine_splits)]
return subquantizer_dists
a query vector
:param tuple coarse_codes:
the coarse codes defining which local space to project to
:param int coarse_split:
index of the coarse split to get distances for - if None then all splits
are computed
:returns list:
a list of distances to each subquantizer cluster for each subquantizer
"""
px = self.project(x, coarse_codes)
subquantizer_dists = []
if coarse_split is None:
split_iter = iterate_splits(px, self.num_coarse_splits)
else:
split_iter = [(np.split(px, self.num_coarse_splits)[coarse_split], coarse_split)]
# for cx, split in iterate_splits(px, self.num_coarse_splits):
for cx, split in split_iter:
_, _, _, subC = self.get_split_parameters(split)
subquantizer_dists += [((fx - subC[sub_split]) ** 2).sum(axis=1) for fx, sub_split in iterate_splits(cx, self.num_fine_splits)]
return subquantizer_dists
a list of ndarrays containing cluster centroids for each subvector
:yields int d:
the cell distance approximation used to order cells
:yields tuple cell:
the cell indices
"""
# Infer parameters
splits = len(centroids)
V = centroids[0].shape[0]
# Compute distances to each coarse cluster and sort
cluster_dists = []
sorted_inds = []
for cx, split in iterate_splits(x, splits):
dists = ((cx - centroids[split]) ** 2).sum(axis=1)
inds = np.argsort(dists)
cluster_dists.append(dists)
sorted_inds.append(inds)
# Some helper functions used below
def cell_for_inds(inds):
return tuple([sorted_inds[s][i] for s, i in enumerate(inds)])
def dist_for_cell(cell):
return sum([cluster_dists[s][i] for s, i in enumerate(cell)])
def inds_in_range(inds):
for i in inds:
:param ndarray x:
the point to code
:param ndarray coarse_codes:
the coarse codes for the point
if they are already computed
:returns tuple:
a tuple of fine codes
"""
if coarse_codes is None:
coarse_codes = self.predict_coarse(x)
px = self.project(x, coarse_codes)
fine_codes = []
for cx, split in iterate_splits(px, self.num_coarse_splits):
# Get product quantizer parameters for this split
_, _, _, subC = self.get_split_parameters(split)
# Compute subquantizer codes
fine_codes += [predict_cluster(fx, subC[sub_split]) for fx, sub_split in iterate_splits(cx, self.num_fine_splits)]
return tuple(fine_codes)