Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
from pgmpy.models import JunctionTree
# Check whether the model is valid or not
self.check_model()
# Triangulate the graph to make it chordal
triangulated_graph = self.triangulate()
# Find maximal cliques in the chordal graph
cliques = list(map(tuple, nx.find_cliques(triangulated_graph)))
# If there is only 1 clique, then the junction tree formed is just a
# clique tree with that single clique as the node
if len(cliques) == 1:
clique_trees = JunctionTree()
clique_trees.add_node(cliques[0])
# Else if the number of cliques is more than 1 then create a complete
# graph with all the cliques as nodes and weight of the edges being
# the length of sepset between two cliques
elif len(cliques) >= 2:
complete_graph = UndirectedGraph()
edges = list(itertools.combinations(cliques, 2))
weights = list(map(lambda x: len(set(x[0]).intersection(set(x[1]))), edges))
for edge, weight in zip(edges, weights):
complete_graph.add_edge(*edge, weight=-weight)
# Create clique trees by minimum (or maximum) spanning tree method
clique_trees = JunctionTree(
nx.minimum_spanning_tree(complete_graph).edges()
)
# Conversion of set to tuple just for indexing
nodes_with_query_variables = tuple(nodes_with_query_variables)
# As junction tree is a tree, that means that there would be only path between any two nodes in the tree
# thus we can just take the path between any two nodes; no matter there order is
for i in range(len(nodes_with_query_variables) - 1):
subtree_nodes.update(
nx.shortest_path(
self.junction_tree,
nodes_with_query_variables[i],
nodes_with_query_variables[i + 1],
)
)
subtree_undirected_graph = self.junction_tree.subgraph(subtree_nodes)
# Converting subtree into a junction tree
if len(subtree_nodes) == 1:
subtree = JunctionTree()
subtree.add_node(subtree_nodes.pop())
else:
subtree = JunctionTree(subtree_undirected_graph.edges())
# Selecting a node is root node. Root node would be having only one neighbor
if len(subtree.nodes()) == 1:
root_node = list(subtree.nodes())[0]
else:
root_node = tuple(
filter(lambda x: len(list(subtree.neighbors(x))) == 1, subtree.nodes())
)[0]
clique_potential_list = [self.clique_beliefs[root_node]]
# For other nodes in the subtree compute the clique potentials as follows
# As all the nodes are nothing but tuples so simple set(root_node) won't work at it would update the set with'
# all the elements of the tuple; instead use set([root_node]) as it would include only the tuple not the
# thus we can just take the path between any two nodes; no matter there order is
for i in range(len(nodes_with_query_variables) - 1):
subtree_nodes.update(
nx.shortest_path(
self.junction_tree,
nodes_with_query_variables[i],
nodes_with_query_variables[i + 1],
)
)
subtree_undirected_graph = self.junction_tree.subgraph(subtree_nodes)
# Converting subtree into a junction tree
if len(subtree_nodes) == 1:
subtree = JunctionTree()
subtree.add_node(subtree_nodes.pop())
else:
subtree = JunctionTree(subtree_undirected_graph.edges())
# Selecting a node is root node. Root node would be having only one neighbor
if len(subtree.nodes()) == 1:
root_node = list(subtree.nodes())[0]
else:
root_node = tuple(
filter(lambda x: len(list(subtree.neighbors(x))) == 1, subtree.nodes())
)[0]
clique_potential_list = [self.clique_beliefs[root_node]]
# For other nodes in the subtree compute the clique potentials as follows
# As all the nodes are nothing but tuples so simple set(root_node) won't work at it would update the set with'
# all the elements of the tuple; instead use set([root_node]) as it would include only the tuple not the
# internal elements within it.
parent_nodes = set([root_node])
nodes_traversed = set()
else:
self.variables = model.nodes()
self.cardinality = {}
self.factors = defaultdict(list)
if isinstance(model, BayesianModel):
for node in model.nodes():
cpd = model.get_cpds(node)
if isinstance(cpd, TabularCPD):
self.cardinality[node] = cpd.variable_card
cpd = cpd.to_factor()
for var in cpd.scope():
self.factors[var].append(cpd)
elif isinstance(model, (MarkovModel, FactorGraph, JunctionTree)):
self.cardinality = model.get_cardinality()
for factor in model.get_factors():
for var in factor.variables:
self.factors[var].append(factor)
elif isinstance(model, DynamicBayesianNetwork):
self.start_bayesian_model = BayesianModel(model.get_intra_edges(0))
self.start_bayesian_model.add_cpds(*model.get_cpds(time_slice=0))
cpd_inter = [model.get_cpds(node) for node in model.get_interface_nodes(1)]
self.interface_nodes = model.get_interface_nodes(0)
self.one_and_half_model = BayesianModel(
model.get_inter_edges() + model.get_intra_edges(1)
)
self.one_and_half_model.add_cpds(
*(model.get_cpds(time_slice=1) + cpd_inter)
def __init__(self, model):
self.model = model
model.check_model()
if isinstance(model, JunctionTree):
self.variables = set(chain(*model.nodes()))
else:
self.variables = model.nodes()
self.cardinality = {}
self.factors = defaultdict(list)
if isinstance(model, BayesianModel):
for node in model.nodes():
cpd = model.get_cpds(node)
if isinstance(cpd, TabularCPD):
self.cardinality[node] = cpd.variable_card
cpd = cpd.to_factor()
for var in cpd.scope():
self.factors[var].append(cpd)
if len(cliques) == 1:
clique_trees = JunctionTree()
clique_trees.add_node(cliques[0])
# Else if the number of cliques is more than 1 then create a complete
# graph with all the cliques as nodes and weight of the edges being
# the length of sepset between two cliques
elif len(cliques) >= 2:
complete_graph = UndirectedGraph()
edges = list(itertools.combinations(cliques, 2))
weights = list(map(lambda x: len(set(x[0]).intersection(set(x[1]))), edges))
for edge, weight in zip(edges, weights):
complete_graph.add_edge(*edge, weight=-weight)
# Create clique trees by minimum (or maximum) spanning tree method
clique_trees = JunctionTree(
nx.minimum_spanning_tree(complete_graph).edges()
)
# Check whether the factors are defined for all the random variables or not
all_vars = itertools.chain(*[factor.scope() for factor in self.factors])
if set(all_vars) != set(self.nodes()):
ValueError("DiscreteFactor for all the random variables not specified")
# Dictionary stating whether the factor is used to create clique
# potential or not
# If false, then it is not used to create any clique potential
is_used = {factor: False for factor in self.factors}
for node in clique_trees.nodes():
clique_factors = []
for factor in self.factors: