Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_machine_with_nested_states(self):
a = State('A')
b = State('B')
b_1 = State('1', parent=b)
b_2 = State('2', parent=b)
m = self.stuff.machine_cls(states=[a, b])
self.assertEqual(b_1.name, 'B{0}1'.format(state_separator))
m.to("B{0}1".format(state_separator))
def test_init_machine_with_nested_states(self):
a = State('A')
b = State('B')
b_1 = State('1', parent=b)
b_2 = State('2', parent=b)
m = self.stuff.machine_cls(states=[a, b])
self.assertEqual(b_1.name, 'B{0}1'.format(state_separator))
m.to("B{0}1".format(state_separator))
def test_diagram(self):
m = self.machine_cls(states=self.states, transitions=self.transitions, initial='A', auto_transitions=False,
title='A test', show_conditions=True, use_pygraphviz=self.use_pygraphviz)
graph = m.get_graph()
self.assertIsNotNone(graph)
self.assertTrue("digraph" in str(graph))
_, nodes, edges = self.parse_dot(graph)
self.assertEqual(len(edges), 14)
# Test that graph properties match the Machine
self.assertEqual(set(m.states.keys()) - set(['C', 'C%s1' % NestedState.separator]),
set(nodes) - set(['C_anchor', 'C%s1_anchor' % NestedState.separator]))
m.walk()
m.run()
# write diagram to temp file
target = tempfile.NamedTemporaryFile(suffix='.png')
m.get_graph().draw(target.name, prog='dot')
self.assertTrue(os.path.getsize(target.name) > 0)
# backwards compatibility check
m.get_graph().draw(target.name, prog='dot')
self.assertTrue(os.path.getsize(target.name) > 0)
# cleanup temp file
target.close()
def test_init_machine_with_nested_states(self):
a = State('A')
b = State('B')
b_1 = State('1', parent=b)
b_2 = State('2', parent=b)
m = self.stuff.machine_cls(states=[a, b])
self.assertEqual(b_1.name, 'B{0}1'.format(state_separator))
m.to("B{0}1".format(state_separator))
except ImportError:
pass
from transitions import MachineError
from transitions.extensions import HierarchicalMachine as Machine
from transitions.extensions.nesting import NestedState as State
from .utils import Stuff
from unittest import TestCase
try:
from unittest.mock import MagicMock
except ImportError:
from mock import MagicMock
nested_separator = State.separator
class TestTransitions(TestCase):
def setUp(self):
states = ['A', 'B',
{'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
'D', 'E', 'F']
self.stuff = Stuff(states, Machine)
def tearDown(self):
pass
def test_blueprint_reuse(self):
states = ['1', '2', '3']
transitions = [
def test_enter_exit_nested(self):
s = self.stuff
s.machine.add_transition('advance', 'A', 'C{0}3'.format(State.separator))
s.machine.add_transition('reverse', 'C', 'A')
s.machine.add_transition('lower', ['C{0}1'.format(State.separator),
'C{0}3'.format(State.separator)], 'C{0}3{0}a'.format(State.separator))
s.machine.add_transition('rise', 'C%s3' % State.separator, 'C%s1' % State.separator)
s.machine.add_transition('fast', 'A', 'C{0}3{0}a'.format(State.separator))
for state in s.machine.states.values():
state.on_enter.append('increase_level')
state.on_exit.append('decrease_level')
s.advance()
self.assertEqual(s.state, 'C%s3' % State.separator)
self.assertEqual(s.level, 2)
self.assertEqual(s.transitions, 3) # exit A; enter C,3
s.lower()
self.assertEqual(s.state, 'C{0}3{0}a'.format(State.separator))
self.assertEqual(s.level, 3)
self.assertEqual(s.transitions, 4) # enter a
s.rise()
self.assertEqual(s.state, 'C%s1' % State.separator)
self.assertEqual(s.level, 2)
self.assertEqual(collector.state, 'waiting')
# reuse counter instance with remap
collector = self.stuff.machine_cls(states=states_remap, transitions=transitions, initial='waiting')
collector.this_passes = self.stuff.this_passes
collector.collect() # collecting
collector.count() # let's see what we got
collector.increase() # counting_2
collector.increase() # counting_3
collector.done() # counting_done
self.assertEqual(collector.state, 'waiting')
# # same as above but with states and therefore stateless embedding
states_remap[2]['children'] = count_states
transitions.append(['increase', 'counting%s1' % State.separator, 'counting%s2' % State.separator])
transitions.append(['increase', 'counting%s2' % State.separator, 'counting%s3' % State.separator])
transitions.append(['done', 'counting%s3' % State.separator, 'waiting'])
collector = self.stuff.machine_cls(states=states_remap, transitions=transitions, initial='waiting')
collector.collect() # collecting
collector.count() # let's see what we got
collector.increase() # counting_2
collector.increase() # counting_3
collector.done() # counting_done
self.assertEqual(collector.state, 'waiting')
# check if counting_done was correctly omitted
collector.add_transition('fail', '*', 'counting%sdone' % State.separator)
with self.assertRaises(ValueError):
collector.fail()
tmp_states.insert(0, self._create_state(**state))
elif isinstance(state, HierarchicalMachine):
# set initial state of parent if it is None
if parent.initial is None:
parent.initial = state.initial
# (deep) copy only states not mentioned in remap
copied_states = [s for s in deepcopy(state.states).values() if s.name not in remap]
# inner_states are the root states of the passed machine
# which have be attached to the parent
inner_states = [s for s in copied_states if s.level == 0]
for inner in inner_states:
inner.parent = parent
tmp_states.extend(copied_states)
for trigger, event in state.events.items():
if trigger.startswith('to_'):
path = trigger[3:].split(NestedState.separator)
# do not copy auto_transitions since they would not be valid anymore;
# trigger and destination do not exist in the new environment
if path[0] in remap:
continue
ppath = parent.name.split(NestedState.separator)
path = ['to_' + ppath[0]] + ppath[1:] + path
trigger = '.'.join(path)
# (deep) copy transitions and
# adjust all transition start and end points to new state names
for transitions in deepcopy(event.transitions).values():
for transition in transitions:
src = transition.source
# transitions from remapped states will be filtered to prevent
# unexpected behaviour in the parent machine
if src in remap:
continue
def _filter_states(states, state_names, prefix=None):
prefix = prefix or []
result = []
for state in states:
pref = prefix + [state['name']]
if 'children' in state:
state['children'] = _filter_states(state['children'], state_names, prefix=pref)
result.append(state)
elif NestedState.separator.join(pref) in state_names:
result.append(state)
return result
def name(self):
""" The computed name of this state. """
if self.parent:
return self.parent.name + NestedState.separator + _super(NestedState, self).name
return _super(NestedState, self).name