Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_merge_nodes(self):
G = ig.Graph.Full(100);
partition = leidenalg.CPMVertexPartition(G, resolution_parameter=0.5);
self.optimiser.merge_nodes(partition, consider_comms=leidenalg.ALL_NEIGH_COMMS);
self.assertListEqual(
partition.sizes(), [100],
msg="CPMVertexPartition(resolution_parameter=0.5) of complete graph after merge nodes incorrect.");
self.assertEqual(
partition.total_weight_in_all_comms(),
G.ecount(),
msg="total_weight_in_all_comms not equal to ecount of graph.");
def test_optimiser(self):
G = reduce(ig.Graph.disjoint_union, (ig.Graph.Tree(10, 3, mode=ig.TREE_UNDIRECTED) for i in range(10)));
partition = leidenalg.CPMVertexPartition(G, resolution_parameter=0);
self.optimiser.consider_comms=leidenalg.ALL_NEIGH_COMMS;
self.optimiser.optimise_partition(partition);
self.assertListEqual(
partition.sizes(), 10*[10],
msg="After optimising partition failed to find different components with CPMVertexPartition(resolution_parameter=0)");
def test_diff_move_node_optimality(self):
G = ig.Graph.Erdos_Renyi(100, p=5./100, directed=False, loops=False);
partition = leidenalg.CPMVertexPartition(G, resolution_parameter=0.1);
while 0 < self.optimiser.move_nodes(partition, consider_comms=leidenalg.ALL_NEIGH_COMMS):
pass;
for v in G.vs:
neigh_comms = set(partition.membership[u.index] for u in v.neighbors());
for c in neigh_comms:
self.assertLessEqual(
partition.diff_move(v.index, c), 1e-10, # Allow for a small difference up to rounding error.
msg="Was able to move a node to a better community, violating node optimality.");
def test_neg_weight_bipartite(self):
G = ig.Graph.Full_Bipartite(50, 50);
G.es['weight'] = -0.1;
partition = leidenalg.CPMVertexPartition(G, resolution_parameter=-0.1, weights='weight');
self.optimiser.consider_comms=leidenalg.ALL_COMMS;
self.optimiser.optimise_partition(partition);
self.assertListEqual(
partition.sizes(), 2*[50],
msg="After optimising partition failed to find bipartite structure with CPMVertexPartition(resolution_parameter=-0.1)");
# NOTE: initial membership is singletons except for atlas nodes, which
# get the membership they have.
initial_membership = []
for isi in range(N):
if isi < n_fixed:
for ii in range(int(self.sizes[isi])):
initial_membership.append(isi)
else:
initial_membership.append(isi)
if len(initial_membership) != Ne:
raise ValueError('initial_membership list has wrong length!')
# Compute communities with semi-supervised Leiden
if clustering_metric == 'cpm':
partition = leidenalg.CPMVertexPartition(
g,
resolution_parameter=resolution_parameter,
initial_membership=initial_membership,
)
elif clustering_metric == 'modularity':
partition = leidenalg.ModularityVertexPartition(
g,
resolution_parameter=resolution_parameter,
initial_membership=initial_membership,
)
else:
raise ValueError(
'clustering_metric not understood: {:}'.format(clustering_metric))
fixed_nodes = [int(i < n_fixede) for i in range(Ne)]
opt.optimise_partition(partition, fixed_nodes=fixed_nodes)
fixed_nodes = [int(i < n_fixed) for i in range(N)]
# NOTE: initial membership is singletons except for atlas nodes, which
# get the membership they have.
aau = list(np.unique(aa))
aaun = len(aau)
initial_membership = []
for j in range(N):
if j < n_fixed:
mb = aau.index(aa[j])
else:
mb = aaun + (j - n_fixed)
initial_membership.append(mb)
if self.metric == 'cpm':
partition = leidenalg.CPMVertexPartition(
g,
resolution_parameter=self.resolution_parameter,
initial_membership=initial_membership,
)
elif self.metric == 'modularity':
partition = leidenalg.ModularityVertexPartition(
g,
resolution_parameter=self.resolution_parameter,
initial_membership=initial_membership,
)
else:
raise ValueError(
'clustering_metric not understood: {:}'.format(self.metric))
# Run modified Leiden here
opt.optimise_partition(partition, fixed_nodes=fixed_nodes)
# NOTE: initial membership is singletons except for atlas nodes, which
# get the membership they have.
tmp = set(aau)
initial_membership = list(aa)
i = 0
for j in range(N - n_fixed):
while i in tmp:
i += 1
initial_membership.append(i)
tmp.add(i)
i += 1
del tmp
# Compute communities with semi-supervised Leiden
if clustering_metric == 'cpm':
partition = leidenalg.CPMVertexPartition(
g,
resolution_parameter=resolution_parameter,
initial_membership=initial_membership,
)
elif clustering_metric == 'modularity':
partition = leidenalg.ModularityVertexPartition(
g,
resolution_parameter=resolution_parameter,
initial_membership=initial_membership,
)
else:
raise ValueError(
'clustering_metric not understood: {:}'.format(clustering_metric))
fixed_nodes = [int(i < n_fixed) for i in range(N)]
opt.optimise_partition(partition, fixed_nodes=fixed_nodes)