Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
Pollution = DiscreteDistribution({'F': 0.9, 'T': 0.1})
Smoker = DiscreteDistribution({'T': 0.3, 'F': 0.7})
print(Smoker)
Cancer = ConditionalProbabilityTable(
[['T','T','T',0.05],
['T','T','F',0.95],
['T','F','T',0.02],
['T','F','F',0.98],
['F','T','T',0.03],
['F','T','F',0.97],
['F','F','T',0.001],
['F','F','F',0.999],
],[Pollution,Smoker])
print(Cancer)
XRay = ConditionalProbabilityTable(
[['T','T',0.9],
['T','F',0.1],
['F','T',0.2],
['F','F',0.8],
],[Cancer])
if parent not in processing_order:
flag = False
if flag:
processing_order.append(id_)
update = True
assert update
data = np.zeros((num_samples, len(nodes_parents)), dtype='int32')
for current in processing_order:
distribution = self.model.states[current].distribution
if isinstance(distribution, DiscreteDistribution):
data[:, current] = distribution.sample(num_samples)
else:
assert isinstance(distribution, ConditionalProbabilityTable)
output_size = list(distribution.keys())
output_size = max([int(x) for x in output_size]) + 1
distribution = json.loads(distribution.to_json())
distribution = distribution['table']
distribution_dict = {}
for row in distribution:
key = tuple(np.asarray(row[:-2], dtype='int'))
output = int(row[-2])
p = float(row[-1])
if key not in distribution_dict:
distribution_dict[key] = np.zeros(output_size)
for parent in parents:
if not parent in processing_order:
flag = False
if flag:
processing_order.append(id_)
update = True
assert update
data = np.empty((n, len(nodes_parents)), dtype='S128')
for current in processing_order:
distribution = self.model.states[current].distribution
if type(distribution) == DiscreteDistribution:
data[:, current] = distribution.sample(n)
else:
assert type(distribution) == ConditionalProbabilityTable
parents_map = nodes_parents[current]
parents = distribution.parents
for _id in range(n):
values = {}
for i in range(len(parents_map)):
tmp = data[_id, parents_map[i]]
try:
tmp = tmp.decode('utf8')
except:
pass
values[parents[i]] = tmp
data[_id, current] = distribution.sample(parent_values=values)
data_t = np.zeros(data.shape)
for col_id in range(data.shape[1]):
data_t[:, col_id] = map_col(self.meta[col_id]['i2s'], data[:, col_id])
def __init__(self):
Pollution = DiscreteDistribution({'F': 0.9, 'T': 0.1})
Smoker = DiscreteDistribution({'T': 0.3, 'F': 0.7})
print(Smoker)
Cancer = ConditionalProbabilityTable(
[['T','T','T',0.05],
['T','T','F',0.95],
['T','F','T',0.02],
['T','F','F',0.98],
['F','T','T',0.03],
['F','T','F',0.97],
['F','F','T',0.001],
['F','F','F',0.999],
],[Pollution,Smoker])
print(Cancer)
XRay = ConditionalProbabilityTable(
[['T','T',0.9],
['T','F',0.1],
['F','T',0.2],
['F','F',0.8],
],[Cancer])
Dyspnoea = ConditionalProbabilityTable(
[['T','T',0.65],
['T','F',0.35],
['F','T',0.3],
['F','F',0.7],
],[Cancer])
s1 = Node(Pollution, name="Pollution")
s2 = Node(Smoker, name="Smoker")
s3 = Node(Cancer, name="Cancer")
s4 = Node(XRay, name="XRay")
s5 = Node(Dyspnoea, name="Dyspnoea")
def __init__(self):
Rain = DiscreteDistribution({'T': 0.2, 'F': 0.8})
Sprinkler = ConditionalProbabilityTable(
[['F','T',0.4],
['F','F',0.6],
['T','T',0.1],
['T','F',0.9],
],[Rain])
Wet = ConditionalProbabilityTable(
[['F','F','T',0.01],
['F','F','F',0.99],
['F','T','T',0.8],
['F','T','F',0.2],
['T','F','T',0.9],
['T','F','F',0.1],
['T','T','T',0.99],
['T','T','F',0.01],
],[Sprinkler,Rain])
s1 = Node(Rain, name="Rain")
s2 = Node(Sprinkler, name="Sprinkler")
s3 = Node(Wet, name="Wet")
model = BayesianNetwork("Simple fully connected")
model.add_states(s1, s2, s3)
edges.append((x, v_name))
todo.add(v_name)
while len(todo) > 0:
# print(todo)
for v_name in todo:
# print(v_name, type(var_nodes[v_name]))
cond_p_table, cond = var_nodes[v_name]
flag = True
for y in cond[1:]:
if y in todo:
flag = False
break
if flag:
cond_t = [var_nodes[x] for x in cond[1:]]
var_nodes[v_name] = ConditionalProbabilityTable(cond_p_table, cond_t)
todo.remove(v_name)
break
for x in var_index_to_name:
var_nodes[x] = Node(var_nodes[x], name=x)
var_nodes_list = [var_nodes[x] for x in var_index_to_name]
# print(var_nodes_list)
model = BayesianNetwork("tmp")
model.add_states(*var_nodes_list)
for edge in edges:
model.add_edge(var_nodes[edge[0]], var_nodes[edge[1]])
model.bake()
# print(model.to_json())
self.model = model
def __init__(self):
A = DiscreteDistribution({'1': 1./3, '2': 1./3, '3': 1./3})
B = ConditionalProbabilityTable(
[['1','1',0.5],
['1','2',0.5],
['1','3',0],
['2','1',0],
['2','2',0.5],
['2','3',0.5],
['3','1',0.5],
['3','2',0],
['3','3',0.5],
],[A])
C = ConditionalProbabilityTable(
[['1','4',0.5],
['1','5',0.5],
['1','6',0],
['2','4',0],
['2','5',0.5],
['T','T','F',0.95],
['T','F','T',0.02],
['T','F','F',0.98],
['F','T','T',0.03],
['F','T','F',0.97],
['F','F','T',0.001],
['F','F','F',0.999],
],[Pollution,Smoker])
print(Cancer)
XRay = ConditionalProbabilityTable(
[['T','T',0.9],
['T','F',0.1],
['F','T',0.2],
['F','F',0.8],
],[Cancer])
Dyspnoea = ConditionalProbabilityTable(
[['T','T',0.65],
['T','F',0.35],
['F','T',0.3],
['F','F',0.7],
],[Cancer])
s1 = Node(Pollution, name="Pollution")
s2 = Node(Smoker, name="Smoker")
s3 = Node(Cancer, name="Cancer")
s4 = Node(XRay, name="XRay")
s5 = Node(Dyspnoea, name="Dyspnoea")
model = BayesianNetwork("Lung Cancer")
model.add_states(s1, s2, s3, s4, s5)
model.add_edge(s1, s3)
model.add_edge(s2, s3)
model.add_edge(s3, s4)
if not parent in processing_order:
flag = False
if flag:
processing_order.append(id_)
update = True
assert update
data = np.zeros((n, len(nodes_parents)), dtype='int32')
for current in processing_order:
distribution = model.states[current].distribution
if type(distribution) == DiscreteDistribution:
data[:, current] = distribution.sample(n)
else:
assert type(distribution) == ConditionalProbabilityTable
output_size = list(distribution.keys())
output_size = max([int(x) for x in output_size]) + 1
distribution = json.loads(distribution.to_json())
distribution = distribution['table']
distribution_dict = {}
for row in distribution:
key = tuple(np.asarray(row[:-2], dtype='int'))
output = int(row[-2])
p = float(row[-1])
if not key in distribution_dict:
distribution_dict[key] = np.zeros(output_size)
distribution_dict[key][int(output)] = p
def __init__(self):
A = DiscreteDistribution({'1': 1./3, '2': 1./3, '3': 1./3})
B = ConditionalProbabilityTable(
[['1','1',0.5],
['1','2',0.5],
['1','3',0],
['2','1',0],
['2','2',0.5],
['2','3',0.5],
['3','1',0.5],
['3','2',0],
['3','3',0.5],
],[A])
C = ConditionalProbabilityTable(
[['1','1',0.5],
['1','2',0.5],
['1','3',0],
['2','1',0],
['2','2',0.5],
['2','3',0.5],
['3','1',0.5],
['3','2',0],
['3','3',0.5],
],[B])
s1 = Node(A, name="A")
s2 = Node(B, name="B")
s3 = Node(C, name="C")
model = BayesianNetwork("ChainSampler")