Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from .test_core import TestTransitions as TestsCore
from .test_core import TestEnumsAsStates
from .utils import Stuff
try:
from unittest.mock import MagicMock
except ImportError:
from mock import MagicMock
try:
# Just to skip tests if graphviz not installed
import graphviz as pgv # @UnresolvedImport
except ImportError: # pragma: no cover
pgv = None
state_separator = State.separator
class Dummy(object):
pass
class TestTransitions(TestsCore):
def setUp(self):
states = ['A', 'B', {'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
'D', 'E', 'F']
machine_cls = MachineFactory.get_predefined(nested=True)
self.stuff = Stuff(states, machine_cls)
def tearDown(self):
State.separator = state_separator
def test_enter_exit_nested(self):
s = self.stuff
s.machine.add_transition('advance', 'A', 'C%s1' % State.separator)
s.machine.add_transition('reverse', 'C', 'A')
s.machine.add_transition('lower', 'C%s1' % State.separator, 'C{0}3{0}a'.format(State.separator))
s.machine.add_transition('rise', 'C%s3' % State.separator, 'C%s1' % State.separator)
s.machine.add_transition('fast', 'A', 'C{0}3{0}a'.format(State.separator))
for name, state in s.machine.states.items():
state.on_enter.append('increase_level')
state.on_exit.append('decrease_level')
s.advance()
self.assertEqual(s.state, 'C%s1' % State.separator)
self.assertEqual(s.level, 2)
s.lower()
self.assertEqual(s.state, 'C{0}3{0}a'.format(State.separator))
self.assertEqual(s.level, 3)
s.rise()
self.assertEqual(s.state, 'C%s1' % State.separator)
{'trigger': 'increase', 'source': '1', 'dest': '2'},
{'trigger': 'increase', 'source': '2', 'dest': '3'},
{'trigger': 'decrease', 'source': '3', 'dest': '2'},
{'trigger': 'decrease', 'source': '1', 'dest': '1'},
{'trigger': 'reset', 'source': '*', 'dest': '1'},
{'trigger': 'done', 'source': '3', 'dest': 'finished'}
]
counter = Machine(states=states, transitions=transitions, initial='1')
new_states = ['A', 'B', {'name': 'C', 'children':
[counter, {'name': 'X', 'children': ['will', 'be', 'filtered', 'out']}],
'remap': {'finished': 'A', 'X': 'A'}}]
new_transitions = [
{'trigger': 'forward', 'source': 'A', 'dest': 'B'},
{'trigger': 'forward', 'source': 'B', 'dest': 'C%s1' % State.separator},
{'trigger': 'backward', 'source': 'C', 'dest': 'B'},
{'trigger': 'backward', 'source': 'B', 'dest': 'A'},
{'trigger': 'calc', 'source': '*', 'dest': 'C%s1' % State.separator},
]
walker = Machine(states=new_states, transitions=new_transitions, before_state_change='watch',
after_state_change='look_back', initial='A')
walker.watch = lambda: 'walk'
walker.look_back = lambda: 'look_back'
counter.increase()
counter.increase()
counter.done()
self.assertEqual(counter.state, 'finished')
def test_with_custom_separator(self):
State.separator = '.'
self.setUp()
self.test_enter_exit_nested()
self.setUp()
self.test_state_change_listeners()
self.test_nested_auto_transitions()
State.separator = '.' if sys.version_info[0] < 3 else u'↦'
self.setUp()
self.test_enter_exit_nested()
self.setUp()
self.test_state_change_listeners()
self.test_nested_auto_transitions()
def setUp(self):
State.separator = state_separator
states = ['A', 'B', {'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
'D', 'E', 'F']
machine_cls = MachineFactory.get_predefined(graph=True, nested=True)
self.stuff = Stuff(states, machine_cls)
# trigger and destination do not exist in the new environment
if path[0] in remap:
continue
ppath = parent.name.split(NestedState.separator)
path = ['to_' + ppath[0]] + ppath[1:] + path
trigger = '.'.join(path)
# (deep) copy transitions and
# adjust all transition start and end points to new state names
for transitions in deepcopy(event.transitions).values():
for transition in transitions:
src = transition.source
# transitions from remapped states will be filtered to prevent
# unexpected behaviour in the parent machine
if src in remap:
continue
dst = parent.name + NestedState.separator + transition.dest\
if transition.dest not in remap else remap[transition.dest]
conditions, unless = [], []
for cond in transition.conditions:
# split a list in two lists based on the accessors (cond.target) truth value
(unless, conditions)[cond.target].append(cond.func)
self._buffered_transitions.append({'trigger': trigger,
'source': parent.name + NestedState.separator + src,
'dest': dst,
'conditions': conditions,
'unless': unless,
'prepare': transition.prepare,
'before': transition.before,
'after': transition.after})
elif isinstance(state, NestedState):
tmp_states.append(state)
def initial(self):
""" When this state is entered it will automatically enter
the child with this name if not None. """
try:
return self.name + NestedState.separator + self._initial if self._initial else self._initial
except TypeError: # we assume an Enum here
return self.name + NestedState.separator + self._initial.name
for state in states:
name = prefix + state['name']
label = self._convert_state_attributes(state)
if 'children' in state:
cluster_name = "cluster_" + name
with container.subgraph(name=cluster_name,
graph_attr=self.machine.style_attributes['graph']['default']) as sub:
style = self.custom_styles['node'][name]
sub.graph_attr.update(label=label, rank='source', **self.machine.style_attributes['graph'][style])
self._cluster_states.append(name)
with sub.subgraph(name=cluster_name + '_root',
graph_attr={'label': '', 'color': 'None', 'rank': 'min'}) as root:
root.node(name + "_anchor", shape='point', fillcolor='black', width='0.1')
self._add_nodes(state['children'], sub, prefix=prefix + state['name'] + NestedState.separator)
else:
style = self.custom_styles['node'][name]
container.node(name, label=label, **self.machine.style_attributes['node'][style])
def _add_nodes(self, states, container, prefix=''):
for state in states:
name = prefix + state['name']
label = self._convert_state_attributes(state)
if 'children' in state:
cluster_name = "cluster_" + name
sub = container.add_subgraph(name=cluster_name, label=label, rank='source',
**self.machine.style_attributes['graph']['default'])
root_container = sub.add_subgraph(name=cluster_name + '_root', label='', color=None, rank='min')
# child_container = sub.add_subgraph(name=cluster_name + '_child', label='', color=None)
root_container.add_node(name + "_anchor", shape='point', fillcolor='black', width='0.1')
self._add_nodes(state['children'], sub, prefix=prefix + state['name'] + NestedState.separator)
else:
container.add_node(name, label=label, shape=self.machine.style_attributes['node']['default']['shape'])