Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
repr() conversion shows same as str() plus embedded connection used by command
"""
moler_conn = ThreadedMolerConnection(decoder=lambda data: data.decode("utf-8"))
class LsCmd(Command):
def __init__(self, options='-l', connection=None):
super(LsCmd, self).__init__(connection=connection)
self.command_string = 'ls {}'.format(options)
def data_received(self, data, recv_time):
pass # not important now
ls = LsCmd(connection=moler_conn)
# (1) command with ThreadedMolerConnection to glued to ext-io
assert 'LsCmd("ls -l", id:{}, using ThreadedMolerConnection(id:{})-->[?])'.format(instance_id(ls), instance_id(moler_conn)) == repr(ls)
# TODO: add test for
# (2) command with ThreadedMolerConnection glued to ext-io
ext_io_connection = FifoBuffer(moler_connection=moler_conn)
how2send_repr = repr(ext_io_connection.write)
assert 'LsCmd("ls -l", id:{}, using ThreadedMolerConnection(id:{})-->[{}])'.format(instance_id(ls), instance_id(moler_conn), how2send_repr) == repr(ls)
# TODO: move ThreadedMolerConnection(id:{})-->[{}])'.format(instance_id(moler_conn), how2send_repr) into ThreadedMolerConnection __repr__ test
# TODO: and here just:
# assert 'LsCmd("ls -l", id:{}, using {})'.format(instance_id(ls), repr(moler_conn)) == repr(ls)
# (3) command without connection
ls.connection = None
assert 'LsCmd("ls -l", id:{}, using )'.format(instance_id(ls)) == repr(ls)
# That lifetime may start even before this submit() if observer is command and we have commands queue.
#
# As a corner case runner.wait_for() may timeout before feeding coroutine has started.
#
# duration of submit() is measured as around 0.0007sec (depends on machine).
event_loop, its_new = thread_secure_get_event_loop(logger_name=self.logger.name)
if its_new:
with AsyncioRunner.runner_lock:
self._started_ev_loops.append(event_loop)
subscribed_data_receiver = self._start_feeding(connection_observer, observer_lock)
self.logger.debug("scheduling feed({})".format(connection_observer))
connection_observer_future = asyncio.ensure_future(self.feed(connection_observer,
subscribed_data_receiver,
observer_lock),
loop=event_loop)
self.logger.debug("runner submit() returning - future: {}:{}".format(instance_id(connection_observer_future),
connection_observer_future))
if connection_observer_future.done():
# most probably we have some exception during ensure_future(); it should be stored inside future
try:
too_early_result = connection_observer_future.result()
err_msg = "PROBLEM: future returned {} already in runner.submit()".format(too_early_result)
self.logger.warning("go background: {} - {}".format(connection_observer, err_msg))
except Exception as err:
err_msg = "PROBLEM: future raised {!r} during runner.submit()".format(err)
self.logger.warning("go background: {} - {}".format(connection_observer, err_msg))
self.logger.exception(err_msg)
raise
self._submitted_futures[id(connection_observer_future)] = connection_observer_future
# need injecting new attribute inside asyncio.Future object
# to allow passing lock to wait_for()
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
event_loop.run_forever()
except BaseException:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
fut_id = instance_id(future)
msg = "not done future in _run_until_complete(fut_id = {}, {})".format(fut_id, future)
sys.stderr.write(msg + "\n")
logging.getLogger("moler").debug(msg)
raise RuntimeError('Event loop stopped before Future completed. (fut_id = {}, {})'.format(fut_id, future))
return future.result()
def _num_channels_of_transport(cls, transport):
transport_id = instance_id(transport)
if transport_id in cls._channels_of_transport:
return len(cls._channels_of_transport[transport_id])
return 0
def _remember_channel_of_transport(cls, channel):
transport_id = instance_id(channel.get_transport())
channel_id = channel.get_id()
if transport_id in cls._channels_of_transport:
cls._channels_of_transport[transport_id].append(channel_id)
else:
cls._channels_of_transport[transport_id] = [channel_id]
def stop(self):
logger = logging.getLogger('moler')
loop_id = instance_id(self)
msg = "Called loop.stop() of {}:{}".format(loop_id, self)
debug_into_logger(logger, msg=msg, levels_to_go_up=1)
debug_into_logger(logger, msg=msg, levels_to_go_up=2)
debug_into_logger(logger, msg=msg, levels_to_go_up=3)
super(LoudEventLoop, self).stop()
Observer key is pair: (self-id, function-id)
"""
try:
self_or_none = six.get_method_self(subscriber)
self_id = instance_id(self_or_none)
self_or_none = weakref.proxy(self_or_none)
except AttributeError:
self_id = 0 # default for not bound methods
self_or_none = None
try:
func = six.get_method_function(subscriber)
except AttributeError:
func = subscriber
function_id = instance_id(func)
subscription_key = (self_id, function_id)
subscription_value = (self_or_none, weakref.proxy(func))
return subscription_key, subscription_value
def _run_until_complete(event_loop, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
event_loop._check_closed()
new_task = not asyncio.futures.isfuture(future)
fut_id = instance_id(future)
future = asyncio.tasks.ensure_future(future, loop=event_loop)
task_id = instance_id(future)
msg = "task for future id ({}) future = asyncio.tasks.ensure_future: (task_id = {}, {})".format(fut_id, task_id,
future)
sys.stderr.write(msg + "\n")
logging.getLogger("moler").debug(msg)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
event_loop.run_forever()
except BaseException:
def _run_until_complete_cb(fut):
exc = fut._exception
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut_id = instance_id(fut)
msg = "_run_until_complete_cb(fut_id = {}, {})".format(fut_id, fut)
sys.stderr.write(msg + "\n")
logging.getLogger("moler").debug(msg)
fut._loop.stop()
def __str__(self):
return '{}({}, id:{})'.format(self.__class__.__name__, self.detect_patterns, instance_id(self))