Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_handler_state(self):
idle = State("I")
cmd = State("C")
idle.add_transitions([Transition(cmd,
self.match,
lambda x: idle)])
fsm = FSM([idle, cmd])
self.commands = []
self.assertEqual(fsm.state, idle)
fsm.process("250 OK\n")
self.assertEqual(fsm.state, idle)
got_name = State('got_name')
got_cookie = State('got_cookie')
reading_key = State('got_key')
parser_state = ParserState()
# initial state; we want "client-name" or it's an error
init.add_transitions([
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
Transition(init, lambda line: not line.startswith('client-name '), parse_error),
])
# next up is "descriptor-cookie" or it's an error
got_name.add_transitions([
Transition(got_cookie, lambda line: line.startswith('descriptor-cookie '), parser_state.set_cookie),
Transition(init, lambda line: not line.startswith('descriptor-cookie '), parse_error),
])
# the "interesting bit": there's either a client-name if we're a
# "basic" file, or an RSA key (with "client-key" before it)
got_cookie.add_transitions([
Transition(reading_key, lambda line: line.startswith('client-key'), None),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# if we're reading an RSA key, we accumulate it in current_key.key
# until we hit a line starting with "client-name"
reading_key.add_transitions([
Transition(reading_key, lambda line: not line.startswith('client-name'), parser_state.add_key_line),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
waiting_p = State("waiting_p")
waiting_s = State("waiting_s")
def ignorable_line(x):
x = x.strip()
return x in ['.', 'OK', ''] or x.startswith('ns/')
waiting_r.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_r.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin))
# FIXME use better method/func than die!!
waiting_r.add_transition(Transition(waiting_r, lambda x: not x.startswith('r '), die('Expected "r " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_w, lambda x: x.startswith('s '), self._router_flags))
waiting_s.add_transition(Transition(waiting_s, lambda x: x.startswith('a '), self._router_address))
waiting_s.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_s.add_transition(Transition(waiting_r, lambda x: not x.startswith('s ') and not x.startswith('a '), die('Expected "s " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_w.add_transition(Transition(waiting_p, lambda x: x.startswith('w '), self._router_bandwidth))
waiting_w.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_w.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "w" lines are optional
waiting_w.add_transition(Transition(waiting_r, lambda x: not x.startswith('w '), die('Expected "w " while parsing routers not "%s"')))
waiting_w.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.startswith('p '), self._router_policy))
waiting_p.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_p.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "p" lines are optional
waiting_p.add_transition(Transition(waiting_r, lambda x: x[:2] != 'p ', die('Expected "p " while parsing routers not "%s"')))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
self._machine = FSM([waiting_r, waiting_s, waiting_w, waiting_p])
self._relay_attrs = None
waiting_r = State("waiting_r")
waiting_w = State("waiting_w")
waiting_p = State("waiting_p")
waiting_s = State("waiting_s")
def ignorable_line(x):
x = x.strip()
return x in ['.', 'OK', ''] or x.startswith('ns/')
waiting_r.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_r.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin))
# FIXME use better method/func than die!!
waiting_r.add_transition(Transition(waiting_r, lambda x: not x.startswith('r '), die('Expected "r " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_w, lambda x: x.startswith('s '), self._router_flags))
waiting_s.add_transition(Transition(waiting_s, lambda x: x.startswith('a '), self._router_address))
waiting_s.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_s.add_transition(Transition(waiting_r, lambda x: not x.startswith('s ') and not x.startswith('a '), die('Expected "s " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_w.add_transition(Transition(waiting_p, lambda x: x.startswith('w '), self._router_bandwidth))
waiting_w.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_w.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "w" lines are optional
waiting_w.add_transition(Transition(waiting_r, lambda x: not x.startswith('w '), die('Expected "w " while parsing routers not "%s"')))
waiting_w.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.startswith('p '), self._router_policy))
waiting_p.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_p.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "p" lines are optional
waiting_p.add_transition(Transition(waiting_r, lambda x: x[:2] != 'p ', die('Expected "p " while parsing routers not "%s"')))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
# initial state; we want "client-name" or it's an error
init.add_transitions([
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
Transition(init, lambda line: not line.startswith('client-name '), parse_error),
])
# next up is "descriptor-cookie" or it's an error
got_name.add_transitions([
Transition(got_cookie, lambda line: line.startswith('descriptor-cookie '), parser_state.set_cookie),
Transition(init, lambda line: not line.startswith('descriptor-cookie '), parse_error),
])
# the "interesting bit": there's either a client-name if we're a
# "basic" file, or an RSA key (with "client-key" before it)
got_cookie.add_transitions([
Transition(reading_key, lambda line: line.startswith('client-key'), None),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# if we're reading an RSA key, we accumulate it in current_key.key
# until we hit a line starting with "client-name"
reading_key.add_transitions([
Transition(reading_key, lambda line: not line.startswith('client-name'), parser_state.add_key_line),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# create our FSM and parse the data
fsm = FSM([init, got_name, got_cookie, reading_key])
for line in stream.readlines():
fsm.process(line.strip())
parser_state.create_key() # make sure we get the "last" one
self.cookie = self.cookie[:-2]
def add_key_line(self, line):
self.key.append(line)
from txtorcon.spaghetti import FSM, State, Transition
init = State('init')
got_name = State('got_name')
got_cookie = State('got_cookie')
reading_key = State('got_key')
parser_state = ParserState()
# initial state; we want "client-name" or it's an error
init.add_transitions([
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
Transition(init, lambda line: not line.startswith('client-name '), parse_error),
])
# next up is "descriptor-cookie" or it's an error
got_name.add_transitions([
Transition(got_cookie, lambda line: line.startswith('descriptor-cookie '), parser_state.set_cookie),
Transition(init, lambda line: not line.startswith('descriptor-cookie '), parse_error),
])
# the "interesting bit": there's either a client-name if we're a
# "basic" file, or an RSA key (with "client-key" before it)
got_cookie.add_transitions([
Transition(reading_key, lambda line: line.startswith('client-key'), None),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
got_name.add_transitions([
Transition(got_cookie, lambda line: line.startswith('descriptor-cookie '), parser_state.set_cookie),
Transition(init, lambda line: not line.startswith('descriptor-cookie '), parse_error),
])
# the "interesting bit": there's either a client-name if we're a
# "basic" file, or an RSA key (with "client-key" before it)
got_cookie.add_transitions([
Transition(reading_key, lambda line: line.startswith('client-key'), None),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# if we're reading an RSA key, we accumulate it in current_key.key
# until we hit a line starting with "client-name"
reading_key.add_transitions([
Transition(reading_key, lambda line: not line.startswith('client-name'), parser_state.add_key_line),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# create our FSM and parse the data
fsm = FSM([init, got_name, got_cookie, reading_key])
for line in stream.readlines():
fsm.process(line.strip())
parser_state.create_key() # make sure we get the "last" one
return parser_state.keys
Transition(got_cookie, lambda line: line.startswith('descriptor-cookie '), parser_state.set_cookie),
Transition(init, lambda line: not line.startswith('descriptor-cookie '), parse_error),
])
# the "interesting bit": there's either a client-name if we're a
# "basic" file, or an RSA key (with "client-key" before it)
got_cookie.add_transitions([
Transition(reading_key, lambda line: line.startswith('client-key'), None),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# if we're reading an RSA key, we accumulate it in current_key.key
# until we hit a line starting with "client-name"
reading_key.add_transitions([
Transition(reading_key, lambda line: not line.startswith('client-name'), parser_state.add_key_line),
Transition(got_name, lambda line: line.startswith('client-name '), parser_state.set_name),
])
# create our FSM and parse the data
fsm = FSM([init, got_name, got_cookie, reading_key])
for line in stream.readlines():
fsm.process(line.strip())
parser_state.create_key() # make sure we get the "last" one
return parser_state.keys
waiting_r = State("waiting_r")
waiting_w = State("waiting_w")
waiting_p = State("waiting_p")
waiting_s = State("waiting_s")
def ignorable_line(x):
x = x.strip()
return x in ['.', 'OK', ''] or x.startswith('ns/')
waiting_r.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_r.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin))
# FIXME use better method/func than die!!
waiting_r.add_transition(Transition(waiting_r, lambda x: not x.startswith('r '), die('Expected "r " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_w, lambda x: x.startswith('s '), self._router_flags))
waiting_s.add_transition(Transition(waiting_s, lambda x: x.startswith('a '), self._router_address))
waiting_s.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_s.add_transition(Transition(waiting_r, lambda x: not x.startswith('s ') and not x.startswith('a '), die('Expected "s " while parsing routers not "%s"')))
waiting_s.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_w.add_transition(Transition(waiting_p, lambda x: x.startswith('w '), self._router_bandwidth))
waiting_w.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_w.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "w" lines are optional
waiting_w.add_transition(Transition(waiting_r, lambda x: not x.startswith('w '), die('Expected "w " while parsing routers not "%s"')))
waiting_w.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.startswith('p '), self._router_policy))
waiting_p.add_transition(Transition(waiting_r, ignorable_line, None))
waiting_p.add_transition(Transition(waiting_s, lambda x: x.startswith('r '), self._router_begin)) # "p" lines are optional
waiting_p.add_transition(Transition(waiting_r, lambda x: x[:2] != 'p ', die('Expected "p " while parsing routers not "%s"')))
waiting_p.add_transition(Transition(waiting_r, lambda x: x.strip() == '.', None))