Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# then use it in creating clique potential
if not is_used[factor] and set(factor.scope()).issubset(node):
clique_factors.append(factor)
is_used[factor] = True
# To compute clique potential, initially set it as unity factor
var_card = [self.get_cardinality()[x] for x in node]
clique_potential = DiscreteFactor(
node, var_card, np.ones(np.product(var_card))
)
# multiply it with the factors associated with the variables present
# in the clique (or node)
# Checking if there's clique_factors, to handle the case when clique_factors
# is empty, otherwise factor_product with throw an error [ref #889]
if clique_factors:
clique_potential *= factor_product(*clique_factors)
clique_trees.add_factors(clique_potential)
if not all(is_used.values()):
raise ValueError(
"All the factors were not used to create Junction Tree."
"Extra factors are defined."
)
return clique_trees
del working_factors[var]
for variable in phi.variables:
working_factors[variable].add(phi)
eliminated_variables.add(var)
# Step 4: Prepare variables to be returned.
final_distribution = set()
for node in working_factors:
factors = working_factors[node]
for factor in factors:
if not set(factor.variables).intersection(eliminated_variables):
final_distribution.add(factor)
if joint:
if isinstance(self.model, BayesianModel):
return factor_product(*final_distribution).normalize(inplace=False)
else:
return factor_product(*final_distribution)
else:
query_var_factor = {}
for query_var in variables:
phi = factor_product(*final_distribution)
query_var_factor[query_var] = phi.marginalize(
list(set(variables) - set([query_var])), inplace=False
).normalize(inplace=False)
return query_var_factor
def _get_factor(self, belief_prop, evidence):
"""
Extracts the required factor from the junction tree.
Parameters
----------
belief_prop: Belief Propagation
Belief Propagation which needs to be updated.
evidence: dict
a dict key, value pair as {var: state_of_var_observed}
"""
final_factor = factor_product(*belief_prop.junction_tree.get_factors())
if evidence:
for var in evidence:
if var in final_factor.scope():
final_factor.reduce([(var, evidence[var])])
return final_factor
final_distribution = set()
for node in working_factors:
factors = working_factors[node]
for factor in factors:
if not set(factor.variables).intersection(eliminated_variables):
final_distribution.add(factor)
if joint:
if isinstance(self.model, BayesianModel):
return factor_product(*final_distribution).normalize(inplace=False)
else:
return factor_product(*final_distribution)
else:
query_var_factor = {}
for query_var in variables:
phi = factor_product(*final_distribution)
query_var_factor[query_var] = phi.marginalize(
list(set(variables) - set([query_var])), inplace=False
).normalize(inplace=False)
return query_var_factor
If array-like: The elimination order to use.
If None: A random elimination order is used.
"""
# Step 1: Deal with the input arguments.
if isinstance(variables, str):
raise TypeError("variables must be a list of strings")
if isinstance(evidence, str):
raise TypeError("evidence must be a list of strings")
# Dealing with the case when variables is not provided.
if not variables:
all_factors = []
for factor_li in self.factors.values():
all_factors.extend(factor_li)
if joint:
return factor_product(*set(all_factors))
else:
return set(all_factors)
# Step 2: Prepare data structures to run the algorithm.
eliminated_variables = set()
# Get working factors and elimination order
working_factors = self._get_working_factors(evidence)
elimination_order = self._get_elimination_order(
variables, evidence, elimination_order, show_progress=show_progress
)
# Step 3: Run variable elimination
if show_progress:
pbar = tqdm(elimination_order)
else:
pbar = elimination_order