Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_compose(self):
# 1. Composition by identity, on the right
# Expect the original transformation
rot_vec = tf.convert_to_tensor([0.2, -0.1, 0.1])
mat = self.so3_group.matrix_from_rotation_vector(rot_vec)
result = self.group.compose(mat, self.group.identity)
expected = mat
expected = helper.to_matrix(mat)
with self.test_session():
self.assertAllClose(gs.eval(result), gs.eval(expected))
# 2. Composition by identity, on the left
# Expect the original transformation
rot_vec = tf.convert_to_tensor([0.2, 0.1, -0.1])
mat = self.so3_group.matrix_from_rotation_vector(rot_vec)
result = self.group.compose(self.group.identity, mat)
expected = mat
with self.test_session():
self.assertAllClose(gs.eval(result), gs.eval(expected))
def setUp(self):
gs.random.seed(1234)
n = 3
group = SpecialOrthogonalGroup(n=n)
# Diagonal left and right invariant metrics
diag_mat_at_identity = gs.eye(group.dimension)
left_diag_metric = InvariantMetric(
group=group,
inner_product_mat_at_identity=diag_mat_at_identity,
left_or_right='left')
right_diag_metric = InvariantMetric(
group=group,
inner_product_mat_at_identity=diag_mat_at_identity,
left_or_right='right')
def to_scalar(expected):
expected = gs.to_ndarray(expected, to_ndim=1)
expected = gs.to_ndarray(expected, to_ndim=2, axis=-1)
return expected
def main():
"""Plot an Agglomerative Hierarchical Clustering on the sphere."""
sphere = Hypersphere(dim=2)
sphere_distance = sphere.metric.dist
n_clusters = 2
n_samples_per_dataset = 50
dataset_1 = sphere.random_von_mises_fisher(
kappa=10,
n_samples=n_samples_per_dataset)
dataset_2 = - sphere.random_von_mises_fisher(
kappa=10,
n_samples=n_samples_per_dataset)
dataset = gs.concatenate((dataset_1, dataset_2), axis=0)
clustering = AgglomerativeHierarchicalClustering(
n_clusters=n_clusters,
distance=sphere_distance)
clustering.fit(dataset)
clustering_labels = clustering.labels_
plt.figure(0)
ax = plt.subplot(111, projection='3d')
plt.title('Agglomerative Hierarchical Clustering')
sphere_plot = visualization.Sphere()
sphere_plot.draw(ax=ax)
for i_label in range(n_clusters):
points_label_i = dataset[clustering_labels == i_label, ...]
inner_prod = self.pointwise_inner_product(curve_srv - base_curve_srv,
base_curve_velocity,
base_point[:, :-1, :])
coef_1 = gs.sqrt(base_curve_velocity_norm)
coef_2 = 1 / base_curve_velocity_norm**(3 / 2) * inner_prod
term_1 = gs.einsum('ij,ijk->ijk', coef_1, curve_srv - base_curve_srv)
term_2 = gs.einsum('ij,ijk->ijk', coef_2, base_curve_velocity)
log_derivative = term_1 + term_2
log_starting_points = self.ambient_metric.log(
point=point[:, 0, :], base_point=base_point[:, 0, :])
log_starting_points = gs.transpose(
gs.tile(log_starting_points, (1, 1, 1)), (1, 0, 2))
log_cumsum = gs.hstack(
[gs.zeros((n_curves, 1, n_coords)),
gs.cumsum(log_derivative, -2)])
log = log_starting_points + 1 / (n_sampling_points - 1) * log_cumsum
return log
def _ball_gradient_descent(points, metric, weights=None, max_iter=32,
lr=1e-3, tau=5e-3):
"""Perform ball gradient descent."""
if len(points) == 1:
return points
if weights is None:
iteration = 0
convergence = math.inf
barycenter = gs.mean(points, axis=0, keepdims=True)
while convergence > tau and max_iter > iteration:
iteration += 1
grad_tangent = 2 * metric.log(points, barycenter)
cc_barycenter = metric.exp(
lr * grad_tangent.sum(0, keepdims=True), barycenter)
convergence = metric.dist(cc_barycenter, barycenter).max().item()
barycenter = cc_barycenter
else:
weights = gs.expand_dims(weights, -1)
Parameters
----------
curve : array-like, shape=[..., n_sampling_points, ambient_dim]
Discrete curve.
Returns
-------
srv : array-like, shape=[..., n_sampling_points - 1, ambient_dim]
Square-root velocity representation of a discrete curve.
"""
curve = gs.to_ndarray(curve, to_ndim=3)
n_curves, n_sampling_points, n_coords = curve.shape
srv_shape = (n_curves, n_sampling_points - 1, n_coords)
curve = gs.reshape(curve, (n_curves * n_sampling_points, n_coords))
coef = gs.cast(gs.array(n_sampling_points - 1), gs.float32)
velocity = coef * self.ambient_metric.log(point=curve[1:, :],
base_point=curve[:-1, :])
velocity_norm = self.ambient_metric.norm(velocity, curve[:-1, :])
srv = gs.einsum(
'...i,...->...i', velocity, 1. / gs.sqrt(velocity_norm))
index = gs.arange(n_curves * n_sampling_points - 1)
mask = ~gs.equal((index + 1) % n_sampling_points, 0)
index_select = gs.get_slice(index, gs.squeeze(gs.where(mask)))
srv = gs.reshape(gs.get_slice(srv, index_select), srv_shape)
return srv
identity = self.group.identity
if base_point is None:
base_point = identity
base_point = self.group.regularize(base_point)
if gs.allclose(base_point, identity):
return self.exp_from_identity(tangent_vec)
# TODO (nguigs): factorize this code to pushforward tangent vec to
# identity by left/right translation
jacobian = self.group.jacobian_translation(
point=base_point, left_or_right=self.left_or_right)
inv_jacobian = gs.linalg.inv(jacobian)
inv_jacobian_transposed = Matrices.transpose(inv_jacobian)
tangent_vec_at_id = gs.einsum(
'...i,...ij->...j', tangent_vec, inv_jacobian_transposed)
exp_from_id = self.exp_from_identity(tangent_vec_at_id)
if self.left_or_right == 'left':
exp = self.group.compose(base_point, exp_from_id)
else:
exp = self.group.compose(exp_from_id, base_point)
exp = self.group.regularize(exp)
return exp
Parameters
----------
base_point : array-like, shape=[..., 2]
Base point.
Returns
-------
mat : array-like, shape=[..., 2, 2]
Inner-product matrix.
"""
param_a = base_point[..., 0]
param_b = base_point[..., 1]
polygamma_ab = gs.polygamma(1, param_a + param_b)
polygamma_a = gs.polygamma(1, param_a)
polygamma_b = gs.polygamma(1, param_b)
vector = gs.stack(
[polygamma_a - polygamma_ab,
- polygamma_ab,
polygamma_b - polygamma_ab], axis=-1)
return SymmetricMatrices.from_vector(vector)