Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _deliver_queued_messages(self):
for (methname, args, kwargs, resolver) in self._pendingMethods:
eventually(self._deliver, methname, args, kwargs, resolver)
del self._pendingMethods
# Q: what are the partial-ordering semantics between queued messages
# and when() clauses that are waiting on this Promise to be resolved?
for d in self._watchers:
eventually(d.callback, self._target)
del self._watchers
def remote_subscribe_to_all(self, observer, catch_up=False):
s = Subscription(observer, self._logger)
eventually(s.subscribe, catch_up)
# allow the call to return before we send them any events
return s
def negotiationFailed(self):
reason = self.failureReason
# TODO: consider logging this unconditionally.. it shouldn't happen
# very often, but if it does, it may take a long time to track down.
# ACTUALLY, parallel connection-hints cause the slower connection to
# hit here with a duplicate connection reason all the time.
self.log("Negotiation.negotiationFailed", failure=reason,
level=OPERATIONAL)
self.stopNegotiationTimer()
if self.receive_phase != ABANDONED and self.isClient:
eventually(self.connector.negotiationFailed, self.factory, reason)
self.receive_phase = ABANDONED
cb = self.options.get("debug_negotiationFailed_cb")
if cb:
# note that this gets called with a NegotiationError, not a
# Failure. ACTUALLY: not true, gets a Failure
eventually(cb, reason)
def finished_recording(self):
self.f2.close()
move_into_place(self.abs_filename_bz2_tmp, self.abs_filename_bz2)
# the compressed logfile has closed successfully. We no longer care
# about the uncompressed one.
self.f1.close()
os.unlink(self.abs_filename)
# now we can tell the world about our new incident report
eventually(self.logger.incident_recorded,
self.abs_filename_bz2, self.name, self.trigger)
def abandonAllRequests(self, why):
for req in list(self.waitingForAnswers.values()):
if why.check(*LOST_CONNECTION_ERRORS):
# map all connection-lost errors to DeadReferenceError, so
# application code only needs to check for one exception type
tubid = None
# since we're creating a new exception object for each call,
# let's add more information to it
if self.remote_tubref:
tubid = self.remote_tubref.getShortTubID()
e = DeadReferenceError("Connection was lost", tubid, req)
why = failure.Failure(e)
eventually(req.fail, why)
# use self.logger.buffers, copy events into logfile
events = list(self.logger.get_buffered_events())
events.sort(lambda a,b: cmp(a['num'], b['num']))
for e in events:
flogfile.serialize_wrapper(self.f1, e,
from_=self.tubid_s, rx_time=now)
flogfile.serialize_wrapper(self.f2, e,
from_=self.tubid_s, rx_time=now)
self.f1.flush()
# the BZ2File has no flush method
if self.TRAILING_DELAY is None:
self.active = False
eventually(self.finished_recording)
else:
# now we wait for the trailing events to arrive
self.timer = reactor.callLater(self.TRAILING_DELAY,
self.stop_recording)
def _notifyConnectionLostWatchers(self):
"""
Call all functions waiting to learn about the loss of the connection of
this broker.
"""
watchers = self._connectionLostWatchers
self._connectionLostWatchers = None
for w in watchers:
eventually(w)
def _ready(res):
self._waiting_for_call_to_be_ready = False
eventually(self.doNextCall)
return res
d.addBoth(_ready)