Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_hub_class(self, hub_type, sensor, sensor_name, capabilities):
stop_evt = Event()
@attach(sensor, name=sensor_name, capabilities=capabilities)
class TestHub(hub_type):
async def sensor_change(self):
pass
async def run(self):
pass
await stop_evt.wait()
return TestHub, stop_evt
import caproto as ca
import curio
import curio.network
from curio import socket
from ..server import AsyncLibraryLayer
from ..server.common import (VirtualCircuit as _VirtualCircuit,
Context as _Context)
class ServerExit(curio.KernelExit):
...
class Event(curio.Event):
async def wait(self, timeout=None):
if timeout is not None:
async with curio.ignore_after(timeout):
await super().wait()
return True
return False
else:
await super().wait()
return True
class UniversalQueue(curio.UniversalQueue):
def put(self, value):
super().put(value)
async def async_put(self, value):
def __init__(self, age, task_module, timeout, task_queue, ack_queue, log_level):
self.alive = True
# 发送task
self.task_queue = task_queue
# 发送task的结果
self.task_module = task_module
self.result_queue = curio.Queue()
self.ack_queue = ack_queue
self.timeout = timeout
self.done_ev = curio.Event()
self.age = age
self.name = 'Magne-ThreadWorker_%s' % age
self.logger = get_component_log(self.name, log_level)
return
def __init__(self):
self._event = Event()
self._result = None
self._exception = None
async def countdown(n):
while n > 0:
print('T-minus', n)
await curio.sleep(1)
n -= 1
async def friend(name):
print('Hi, my name is', name)
print('Playing Minecraft')
try:
await curio.sleep(1000)
except curio.CancelledError:
print(name, 'going home')
raise
start_evt = curio.Event()
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
async def kid():
while True:
try:
print('Can I play?')
await curio.timeout_after(1, start_evt.wait)
break
except curio.TaskTimeout:
print('Wha!?!')
def __init__(self, host, port, *,
family=socket.AF_INET, backlog=100, ssl=None,
reuse_address=True, reuse_port=False):
self.ssl = ssl
self.socket = tcp_server_socket(
host, port, family, backlog, reuse_address, reuse_port)
self.ready = curio.Event()
self._sockaddr = None
def __init__(self, service: Service):
config = GRPCConfiguration(client_side=False)
self.connection = GRPCConnection(config=config)
self.service = service
self.request_message_queue = {}
self.write_event = curio.Event()
self.write_shutdown = False
self.stream = None
lib.sleep = curio.sleep
lib.task_manager = curio.TaskGroup
lib.timeout_after = curio.timeout_after
lib.sendall = curio_sendall
lib.recv = curio_recv
lib.sock_close = curio_close
lib.spawn = curio_spawn
lib.finalize_agen = curio.meta.finalize
lib.cancel_task_group = _event_loop_wrappers.curio_cancel
lib.unwrap_taskgrouperror = lambda error: [task.next_exc for task in error.failed]
lib.unwrap_result = lambda task: task.result
lib.Lock = curio.Lock
lib.Semaphore = curio.BoundedSemaphore
lib.Queue = curio.Queue
lib.Event = curio.Event
lib.Cancelled = curio.CancelledError
lib.TaskTimeout = curio.TaskTimeout
lib.TaskGroupError = curio.TaskGroupError
lib.wait_read = _low_level.wait_read_curio
lib.wait_write = _low_level.wait_write_curio