Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self) -> None:
try:
curio.run(self._run_test)
except KeyboardInterrupt:
print('Cancelled')
def my_test_ipv6():
server, bind_addr, _ = get_server("http://user:password@[::1]:0")
bind_address = f"{bind_addr[0]}:{bind_addr[1]}"
client = get_client(f"http://user:password@{bind_address}")
curio.run(main(make_request(client), server))
def curio_test(x):
async def main():
for n in range(COUNT):
r = await curio.run_in_process(fib, x)
start = time.time()
curio.run(main())
end = time.time()
print('Curio:', end-start)
def run(pvdb, *, interfaces=None, log_pv_names=False):
"""
A synchronous function that runs server, catches KeyboardInterrupt at exit.
"""
try:
return curio.run(
functools.partial(
start_server,
pvdb,
interfaces=interfaces,
log_pv_names=log_pv_names))
except KeyboardInterrupt:
return
async with TaskGroup() as group:
await group.spawn(pipe, client, remote, enc.encrypt)
await group.spawn(pipe, remote, client, enc.decrypt)
print('pipe: %r <-> %r' % tuple([s.getsockname() for s in (client, remote)]))
print('closed:', addr)
async def pipe(src, dst, crypt):
while 1:
data = await src.recv(1024)
if not data:
break
await dst.sendall(crypt(data))
if __name__ == '__main__':
run(socks5_server, ('',25000))
return
elif isinstance(event, Error):
return
await send_outgoing_data()
parser = ArgumentParser(description='A sample IRC client')
parser.add_argument('host', help='address of irc server (foo.bar.baz or foo.bar.baz:port)')
parser.add_argument('nickname', help='nickname to register as')
parser.add_argument('channel', help='channel to join once registered')
parser.add_argument('message', help='message to send once joined')
args = parser.parse_args()
host, _, port = args.host.partition(':')
curio.run(send_message_to_channel(host, int(port or 6667), args.nickname, args.channel,
args.message))
request_data = requests[event.request_id][2]
if event.data:
request_data.extend(event.data)
else:
params, keep_connection, request_data = requests.pop(event.request_id)
handle_request(conn, event.request_id, params, request_data)
if not keep_connection:
break
data = conn.data_to_send()
if data:
await client.sendall(data)
if __name__ == '__main__':
try:
run(fcgi_server(('', 9500)))
except (KeyboardInterrupt, SystemExit):
pass
# this time.
blocked_streams = list(self.flow_control_events.keys())
for stream_id in blocked_streams:
event = self.flow_control_events.pop(stream_id)
await event.set()
return
if __name__ == '__main__':
host = sys.argv[2] if len(sys.argv) > 2 else "localhost"
print("Try GETting:")
print(" On OSX after 'brew install curl --with-c-ares --with-libidn --with-nghttp2 --with-openssl':")
print("/usr/local/opt/curl/bin/curl --tlsv1.2 --http2 -k https://localhost:5000/bundle.js")
print("Or open a browser to: https://localhost:5000/")
print(" (Accept all the warnings)")
run(h2_server((host, 5000), sys.argv[1],
"{}.crt.pem".format(host),
"{}.key".format(host)), with_monitor=True)