Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
r2 = Fluent.create_fluent(running(c2), 1)
r3 = Fluent.create_fluent(running(c3), 1)
head = Term('__s0__')
body = ~r1 & ~r2 & ~r3
rule = head << body
node = engine.add_rule(head, [~r1, ~r2, ~r3])
self.assertTrue(engine.get_rule(node), rule)
head = Term('__s3__')
body = r1 & r2 & ~r3
node = engine.add_rule(head, [r1, r2, ~r3])
rule = engine.get_rule(node)
self.assertTrue(rule, head << body)
head = Term('__s7__')
body = r1 & r2 & r3
rule = head << body
node = engine.add_rule(head, [r1, r2, r3])
self.assertTrue(engine.get_rule(node), rule)
running = Term('running')
c1 = Constant('c1')
c2 = Constant('c2')
c3 = Constant('c3')
r1 = Fluent.create_fluent(running(c1), 1)
r2 = Fluent.create_fluent(running(c2), 1)
r3 = Fluent.create_fluent(running(c3), 1)
head = Term('__s0__')
body = ~r1 & ~r2 & ~r3
rule = head << body
node = engine.add_rule(head, [~r1, ~r2, ~r3])
self.assertTrue(engine.get_rule(node), rule)
head = Term('__s3__')
body = r1 & r2 & ~r3
node = engine.add_rule(head, [r1, r2, ~r3])
rule = engine.get_rule(node)
self.assertTrue(rule, head << body)
head = Term('__s7__')
body = r1 & r2 & r3
rule = head << body
node = engine.add_rule(head, [r1, r2, r3])
self.assertTrue(engine.get_rule(node), rule)
def test_state_space(self):
running = Term('running')
fluents = [ running.with_args(Constant('c%d' % i), Constant(0)) for i in range(1, 4) ]
states = StateSpace(fluents)
for i, state in enumerate(states):
self.assertEqual(len(state), 3)
n = 0
for j, (fluent, value) in enumerate(state.items()):
self.assertEqual(fluent.functor, 'running')
self.assertEqual(fluent.args[0], 'c%d' % (j + 1))
self.assertEqual(fluent.args[-1], 0)
n += value * (2**j)
self.assertEqual(n, i)
def test_action_space(self):
reboot = Term('reboot')
computers = [ Constant('c%i' % i) for i in range(1, 4) ]
fluents = [ reboot(c) for c in computers ]
fluents.append(reboot(Constant('none')))
actions = ActionSpace(fluents)
for i, action in enumerate(actions):
self.assertEqual(sum(action.values()), 1)
for j, (fluent, value) in enumerate(action.items()):
self.assertEqual(fluent, fluents[j])
if j == i:
self.assertEqual(value, 1)
else:
self.assertEqual(value, 0)
def test_action_space(self):
reboot = Term('reboot')
computers = [ Constant('c%i' % i) for i in range(1, 4) ]
fluents = [ reboot(c) for c in computers ]
fluents.append(reboot(Constant('none')))
actions = ActionSpace(fluents)
for i, action in enumerate(actions):
self.assertEqual(sum(action.values()), 1)
for j, (fluent, value) in enumerate(action.items()):
self.assertEqual(fluent, fluents[j])
if j == i:
self.assertEqual(value, 1)
else:
self.assertEqual(value, 0)
def test_add_assignment(self):
engine = self.engines['sysadmin']
fluents = engine.declarations('state_fluent')
for i in range(2**len(fluents)):
state = Term('__s%d__' % i)
value = (-1)**(i % 2) * 10.0*i
node = engine.add_assignment(state, value)
fact = engine.get_fact(node)
self.assertEqual(fact.functor, 'utility')
self.assertEqual(fact.args, (state, Constant(value)))
n = len(self._next_state_atoms)
valuation = [0]*n
for i in range(2**n):
body_atoms = []
for pos in range(n):
if valuation[pos] == 1:
body_atoms.append(self._next_state_atoms[pos])
else:
body_atoms.append(~self._next_state_atoms[pos])
body = And.from_list(body_atoms)
head = Term('__s{}__'.format(i))
self._value_function_atoms.append(head)
rule = head << body
self._db.add_clause(rule)
value = Term('utility', head.with_probability(None), Constant(0.0))
self._db.add_fact(value)
MDPProbLog.next_valuation(valuation)
def _get_decision_facts(self):
self._action_decision_facts = []
self._state_decision_facts = []
for i, n, t in self._gp:
if t == 'atom' and n.probability == Term('?'):
functor = n.name.functor
if functor in self._state_functors:
self._state_decision_facts.append(n.name)
else:
self._action_decision_facts.append(n.name)
self._state_decision_facts = sorted(self._state_decision_facts, key=Term.__repr__)
def __init__(self, model, gamma, epsilon):
self._model = model
self._gamma = gamma
self._epsilon = epsilon
print(" >> Preprocessing program ...", end=" ")
start = time.clock()
self._db = self._eng.prepare(PrologString(model))
self._build_state_atoms()
self._get_action_atoms()
self._build_action_rules()
self._build_value_function_rules()
self._utilities = dict(self._eng.query(self._db, Term('utility', None, None)))
end = time.clock()
uptime = end-start
print("Done in {0:.3f}sec.".format(uptime))
print(" >> Relevant grounding ...", end=" ")
start = time.clock()
self._gp = self._eng.ground_all(self._db, target=None, queries=self._utilities.keys())
end = time.clock()
uptime = end-start
print("Done in {0:.3f}sec.".format(uptime))
print(" >> Compilation ...", end=" ")
start = time.clock()
self._knowledge = get_evaluatable(None).create_from(self._gp)
end = time.clock()
uptime = end-start
def _build_state_atoms(self):
state_vars = [p[0] for p in self._eng.query(self._db, Term('state_fluent', None))]
self._state_functors = set()
self._next_state_atoms = []
self._current_state_atoms = []
for t in state_vars:
self._state_functors.add(t.functor)
args = t.args + (Constant(1),)
self._next_state_atoms.append(t.with_args(*args))
args = t.args + (Constant(0),)
curr_state_atom = t.with_args(*args)
self._current_state_atoms.append(curr_state_atom)
self._db.add_fact(curr_state_atom.with_probability(Term('?')))