Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _try_emit(self):
t = loop.time()
q = self._q
times = self._time_q
costs = self._cost_q
# forget old emit times
while times and t - times[0] > self._interval:
times.popleft()
costs.popleft()
# emit values while not exceeding the limit
while q:
args, cost = q[0]
if self._cost_func:
cost = self._cost_func(*args)
total_cost = cost + sum(costs)
else:
def on_source(self, *args):
time = loop.time()
delta = time - self._last_time
self._last_time = time
if self._on_first:
if delta >= self._interval:
self.emit(*args)
else:
if self._handle:
self._handle.cancel()
self._handle = loop.call_at(
time + self._interval, self._delayed_emit, *args)
def __init__(self, timeout, source=None):
Op.__init__(self, source)
if source.done():
return
self._timeout = timeout
self._last_time = loop.time()
self._handle = None
self._schedule()
def on_source(self, *args):
self._last_time = loop.time()