Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, port: int) -> None:
self._port = port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._tasks = curio.Queue()
self._keep_running = True
async def queue_test(kernel):
print('Queue Test')
q = curio.Queue()
async def queue_worker():
while True:
item = await q.get()
await curio.sleep(4)
print('Worker processed', item)
q.task_done()
kernel.add_task(queue_worker())
for n in range(10):
await q.put(n)
await q.join()
print('Queue Test Worker finished')
async def test_worker():
import logging
logger = logging.getLogger('test_connection')
log_handler = logging.StreamHandler()
log_format = logging.Formatter('%(levelname)s %(asctime)s %(message)s')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
logger.setLevel(logging.DEBUG)
getter_queue = curio.Queue()
putter_queue = curio.Queue()
wp = MagneWorkerPool(worker_nums=2, worker_timeout=15,
getter_queue=getter_queue, putter_queue=putter_queue,
task_module_path='magne.process_worker.demo_task', logger=logger,
)
stask = await curio.spawn(wp.start)
await stask.join()
await curio.sleep(5)
body = {'channel_number': 1,
'delivery_tag': 1,
'consumer_tag': 1,
'exchange': 1,
'routing_key': 1,
'data': json.dumps({'func': 'sleep', 'args': [10]}),
}
await wp.getter_queue.put(json.dumps(body))
async def system():
robot = Robot('Vernie')
remote = Remote('remote')
# Define a message passing queue from the remote to the robot
remote.tell_robot = Queue()
robot.listen_remote = remote.tell_robot
async def test_connection():
# for test
import logging
import os
logger = logging.getLogger('test_connection')
log_handler = logging.StreamHandler()
log_format = logging.Formatter('%(levelname)s %(asctime)s %(pathname)s %(lineno)d %(message)s')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
logger.setLevel(logging.DEBUG)
getter_queue = curio.Queue()
putter_queue = curio.Queue()
c = MagneConnection(['tc1', 'tc2'], logger, getter_queue, putter_queue)
logger.info('connection pid: %s' % os.getpid())
await c.connect()
await c.run()
await curio.sleep(10)
await c.pre_close()
await c.close()
return
async def task_b():
q = curio.Queue()
tid = kernel.add_task(task_a(q))
print('Created task', tid)
await curio.sleep(60)
print('About to cancel')
kernel.cancel_task(tid)
print('Done with cancel')
async def start(self):
self.logger.info('starting in pid: %s' % (os.getpid()))
amqp_task_queue = curio.Queue()
ack_queue = curio.Queue()
self.con = Connection(ack_queue, amqp_task_queue, list(self._tasks.keys()), self.amqp_url, self.qos, self.log_level)
con_run_task = await curio.spawn(self.con.run)
await con_run_task.join()
self.worker_pool = ThreadWorkerPool(self.thread_nums, self.worker_timeout, self.task_module,
ack_queue, amqp_task_queue, self.log_level
)
pool_task = await curio.spawn(self.worker_pool.run)
await pool_task.join()
signal_task = await curio.spawn(self.watch_signal)
await signal_task.join()
return
def __init__(self):
self.broadcaster = ca.Broadcaster(our_role=ca.CLIENT)
self.log = self.broadcaster.log
self.command_bundle_queue = curio.Queue()
self.broadcaster_command_condition = curio.Condition()
# UDP socket broadcasting to CA servers
self.udp_sock = ca.bcast_socket(socket)
self.broadcaster.our_address = self.udp_sock.getsockname()[:2]
self.registered = False # refers to RepeaterRegisterRequest
self.loop_ready_event = curio.Event()
self.unanswered_searches = {} # map search id (cid) to name
self.search_results = {} # map name to address
self.environ = ca.get_environment_variables()
self.ca_server_port = self.environ['EPICS_CA_SERVER_PORT']
async def run_ws(client, addr):
in_q, out_q = Queue(), Queue()
ws_task = await spawn(ws_adapter, in_q, out_q, client, addr)
await handler(in_q, out_q)
await out_q.put(None)
await ws_task.join() # Wait until it's done.
# Curio will close the socket for us after we drop off here.
print("Master task done.")