Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_removeHandler():
m = Manager()
m.start()
method = m.addHandler(on_foo)
waiter = pytest.WaitEvent(m, "foo")
x = m.fire(foo())
waiter.wait()
s = x.value
assert s == "Hello World!"
m.removeHandler(method)
waiter = pytest.WaitEvent(m, "foo")
x = m.fire(foo())
waiter.wait()
def test_tcp_basic(Poller, ipv6):
m = Manager() + Poller()
if ipv6:
tcp_server = TCP6Server(("::1", 0))
tcp_client = TCP6Client()
else:
tcp_server = TCPServer(0)
tcp_client = TCPClient()
server = Server() + tcp_server
client = Client() + tcp_client
server.register(m)
client.register(m)
m.start()
try:
def test_main():
id = "%s:%s" % (os.getpid(), current_thread().getName())
m = Manager()
assert repr(m) == "" % id
app = App()
app.register(m)
s = repr(m)
assert s == "" % id
m.start()
pytest.wait_for(m, "_running", True)
sleep(0.1)
s = repr(m)
assert s == "" % id
m.stop()
def test_tcp_reconnect(Poller, ipv6):
# XXX: Apparently this doesn't work on Windows either?
# XXX: UPDATE: Apparently Broken on Windows + Python 3.2
# TODO: Need to look into this. Find out why...
if pytest.PLATFORM == "win32" and pytest.PYVER[:2] >= (3, 2):
pytest.skip("Broken on Windows on Python 3.2")
m = Manager() + Poller()
if ipv6:
tcp_server = TCP6Server(("::1", 0))
tcp_client = TCP6Client()
else:
tcp_server = TCPServer(0)
tcp_client = TCPClient()
server = Server() + tcp_server
client = Client() + tcp_client
server.register(m)
client.register(m)
m.start()
try:
def main():
opts, args = parse_options()
if opts.jit and psyco:
psyco.full()
if ":" in opts.bind:
address, port = opts.bind.split(":")
port = int(port)
else:
address, port = opts.bind, 8000
bind = (address, port)
manager = Manager()
if opts.debug:
manager += Debugger(events=False)
poller = opts.poller.lower()
if poller == "poll":
Poller = Poll
elif poller == "epoll":
if EPoll is None:
warnings.warn("No epoll support available.")
Poller = Select
else:
Poller = EPoll
else:
Poller = Select
if ":" in opts.bind:
address, port = opts.bind.split(":")
port = int(port)
else:
address, port = opts.bind, 8000
if args:
x = args[0].split(":")
if len(x) > 1:
nodes = [(x[0], int(x[1]))]
else:
nodes = [(x[0], 8000)]
else:
nodes = []
manager = Manager()
bridge = Bridge(port, address=address, nodes=nodes)
manager += bridge
if opts.debug:
debugger = Debugger()
debugger.IgnoreEvents.extend(["Read", "Write"])
manager += debugger
if opts.server:
manager += Adder()
while True:
try:
manager.flush()
bridge.poll()
if ":" in opts.bind:
address, port = opts.bind.split(":")
port = int(port)
else:
address, port = opts.bind, 8000
if args:
x = args[0].split(":")
if len(x) > 1:
nodes = [(x[0], int(x[1]))]
else:
nodes = [(x[0], 8000)]
else:
nodes = []
manager = Manager()
debugger = Debugger()
debugger.IgnoreEvents.extend(["Read", "Write"])
manager += debugger
bridge = Bridge(port, address=address, nodes=nodes)
manager += bridge
manager.run()
def main():
opts, args = parse_options()
if opts.jit and psyco:
psyco.full()
if ":" in opts.bind:
address, port = opts.bind.split(":")
port = int(port)
else:
address, port = opts.bind, 8000
bind = (address, port)
manager = Manager()
if opts.debug:
manager += Debugger()
poller = opts.poller.lower()
if poller == "poll":
Poller = Poll
elif poller == "epoll":
if EPoll is None:
print "No epoll support available - defaulting to Select..."
Poller = Select
else:
Poller = EPoll
else:
Poller = Select
def main():
opts, args = parse_options()
if opts.speed and psyco:
psyco.full()
manager = Manager()
monitor = Monitor(opts)
manager += monitor
state = State(opts)
manager += state
if opts.debug:
manager += Debugger()
if opts.listen or args:
nodes = []
if args:
for node in args:
if ":" in node:
host, port = node.split(":")
def main():
opts, args = parse_options()
bind = parse_bind(opts.bind)
if opts.validate:
application = (Application() + Root())
app = validator(application)
httpd = make_server(bind[0], bind[1], app)
httpd.serve_forever()
raise SystemExit(0)
manager = Manager()
opts.debug and Debugger().register(manager)
Poller = select_poller(opts.poller.lower())
Poller().register(manager)
if opts.server.lower() == "base":
BaseServer(bind).register(manager)
HelloWorld().register(manager)
else:
Server(bind).register(manager)
Root().register(manager)
docroot = os.getcwd() if not args else args[0]
Static(docroot=docroot, dirlisting=True).register(manager)