Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def func_wrapper(*args, **kwargs):
kernel = curio.Kernel()
kernel.run(func(*args, **kwargs))
kernel.run(shutdown=True)
def test_curio_client_example():
from caproto.examples.curio_client_simple import main
with curio.Kernel() as kernel:
kernel.run(main())
print('new from caput', data['new'])
# check value from database compared to value from caput output
assert db_new == data['new']
# check value from database compared to value the test expects
assert db_new == check_value
async def task():
server_task = await curio.spawn(curio_server)
try:
await client()
finally:
await server_task.cancel()
with curio.Kernel() as kernel:
kernel.run(task)
print('done')
zenline.encode('utf-8'))
################################################################
# Run the server
################################################################
async def main():
server = await curio.spawn(curio.tcp_server('127.0.0.1', 8080, http_serve))
await curio.SignalSet(signal.SIGTERM, signal.SIGINT).wait()
await server.cancel()
if __name__ == "__main__":
kernel = curio.Kernel()
print("Listening on http://localhost:8080")
kernel.run(main(), shutdown=True)
"--version", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument("server", nargs="+", type=get_server)
args = parser.parse_args(arguments)
if args.verbose == 0:
level = logging.ERROR
elif args.verbose == 1:
level = logging.INFO
else:
level = logging.DEBUG
gvars.logger.setLevel(level)
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (50000, 50000))
except Exception:
gvars.logger.warning("Require root permission to allocate resources")
kernel = curio.Kernel()
try:
kernel.run(multi_server(*args.server))
except Exception as e:
gvars.logger.exception(str(e))
except KeyboardInterrupt:
kernel.run(shutdown=True)
)
parser.add_argument(
"-v", dest="verbose", action="count", default=0, help="print verbose output"
)
parser.add_argument(
"--version", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument("server", nargs="+", type=get_server)
args = parser.parse_args()
global verbose
verbose = args.verbose
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (50000, 50000))
except Exception as e:
print("Require root permission to allocate resources")
kernel = curio.Kernel()
try:
kernel.run(multi_server(*args.server))
except Exception as e:
traceback.print_exc()
for k, v in kernel._selector.get_map().items():
print(k, v, file=sys.stderr)
for conn in connections:
print("|", conn, file=sys.stderr)
except KeyboardInterrupt:
kernel.run(shutdown=True)
print()
if not line:
break
print(line)
await writer.write(
b'''HTTP/1.0 200 OK\r
Content-type: text/plain\r
\r
If you're seeing this, it probably worked. Yay!
''')
await writer.write(time.asctime().encode('ascii'))
await writer.close()
await reader.close()
if __name__ == '__main__':
kernel = curio.Kernel()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
kernel.run(curio.run_server('', 10000, handler, ssl=ssl_context))
def teardown(self):
async def curio_cleanup():
await self.chan1.clear()
with curio.Kernel() as kernel:
kernel.run(curio_cleanup())
self.cm.__exit__(StopIteration, None, None)