Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
args = ARGS.parse_args()
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
tulip.set_event_loop(aiozmq.new_event_loop())
superviser = Superviser(args)
superviser.start()
'--port', action="store", dest='port',
default=9999, type=int, help='Port number')
if __name__ == '__main__':
args = ARGS.parse_args()
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
if (not (args.server or args.client)) or (args.server and args.client):
print('Please specify --server or --client\n')
ARGS.print_help()
else:
loop = aiozmq.new_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
tulip.set_event_loop(loop)
if args.server:
start_server(loop, args.host, args.port)
else:
start_client(loop, args.host, args.port)
loop.run_forever()
ARGS.add_argument(
'--port', action="store", dest='port',
default=9999, type=int, help='Port number')
if __name__ == '__main__':
args = ARGS.parse_args()
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
if (not (args.server or args.client)) or (args.server and args.client):
print('Please specify --server or --client\n')
ARGS.print_help()
else:
loop = aiozmq.new_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
tulip.set_event_loop(loop)
if '--server' in sys.argv:
start_server(loop, (args.host, args.port))
else:
start_client(loop, (args.host, args.port))
loop.run_forever()
if args.ssl:
here = os.path.join(os.path.dirname(__file__), 'tests')
if args.certfile:
certfile = args.certfile or os.path.join(here, 'sample.crt')
keyfile = args.keyfile or os.path.join(here, 'sample.key')
else:
certfile = os.path.join(here, 'sample.crt')
keyfile = os.path.join(here, 'sample.key')
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.load_cert_chain(certfile, keyfile)
else:
sslcontext = None
loop = aiozmq.new_event_loop()
tulip.set_event_loop(loop)
f = loop.start_serving(
lambda: HttpServer(debug=True, keep_alive=75), args.host, args.port,
ssl=sslcontext)
socks = loop.run_until_complete(f)
print('serving on', socks[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
import tulip
import zmq
import aiozmq
loop = aiozmq.new_event_loop()
tulip.set_event_loop(loop)
# server
ctx = aiozmq.Context(loop=loop)
#sock1 = ctx.socket(zmq.PUSH)
#sock1.bind('ipc:///tmp/zmqtest')
def send_data():
sock1.send_pyobj(('this', 'is', 'a', 'python', 'tuple'))
yield from tulip.sleep(1.0)
sock1.send_pyobj({'hi': 1234})
yield from tulip.sleep(1.0)
sock1.send_pyobj(
({'this': ['is a more complicated object', ':)']}, 42, 42, 42))
yield from tulip.sleep(1.0)