Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import mitogen.utils
import ansible_mitogen.affinity
mitogen.utils.setup_gil()
ansible_mitogen.affinity.policy.assign_worker()
X = 20000
def flip_flop(ready, inp, out):
ready.put(None)
for x in xrange(X):
inp.get()
out.put(None)
ready = mitogen.core.Latch()
l1 = mitogen.core.Latch()
l2 = mitogen.core.Latch()
t1 = threading.Thread(target=flip_flop, args=(ready, l1, l2))
t2 = threading.Thread(target=flip_flop, args=(ready, l2, l1))
t1.start()
t2.start()
ready.get()
ready.get()
t0 = mitogen.core.now()
l1.put(None)
t1.join()
t2.join()
print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+X))), 'usec')
testing of both.
"""
import logging
import random
import threading
import time
import mitogen.core
import mitogen.utils
mitogen.utils.log_to_file()
mitogen.core.IOLOG.setLevel(logging.DEBUG)
mitogen.core._v = True
mitogen.core._vv = True
l = mitogen.core.Latch()
consumed = 0
produced = 0
crash = 0
def cons():
global consumed, crash
try:
while 1:
g = l.get()
print('got=%s consumed=%s produced=%s crash=%s' % (g, consumed, produced, crash))
consumed += 1
time.sleep(g)
for x in range(int(g * 1000)):
pass
except:
crash += 1
def sync_with_broker(broker, timeout=10.0):
"""
Insert a synchronization barrier between the calling thread and the Broker
thread, ensuring it has completed at least one full IO loop before
returning.
Used to block while asynchronous stuff (like defer()) happens on the
broker.
"""
sem = mitogen.core.Latch()
broker.defer(sem.put, None)
sem.get(timeout=timeout)
def _wait_or_start(self, spec, via=None):
latch = mitogen.core.Latch()
key = key_from_dict(via=via, **spec)
self._lock.acquire()
try:
response = self._response_by_key.get(key)
if response is not None:
self._refs_by_context[response['context']] += 1
latch.put(response)
return latch
latches = self._latches_by_key.setdefault(key, [])
first = len(latches) == 0
latches.append(latch)
finally:
self._lock.release()
if first:
def disconnect(self, context):
"""
Disconnect a context and forget its stream, assuming the context is
directly connected.
"""
stream = self.stream_by_id(context)
if stream is None or stream.protocol.remote_id != context.context_id:
return
l = mitogen.core.Latch()
mitogen.core.listen(stream, 'disconnect', l.put)
def disconnect():
LOG.debug('Starting disconnect of %r', stream)
stream.on_disconnect(self.broker)
self.broker.defer(disconnect)
l.get()
def get(self, path):
"""
Fetch a file from the cache.
"""
assert isinstance(path, mitogen.core.UnicodeType)
self._lock.acquire()
try:
if path in self._cache:
return self._cache[path]
latch = mitogen.core.Latch()
waiters = self._waiters.setdefault(path, [])
waiters.append(lambda: latch.put(None))
finally:
self._lock.release()
LOG.debug('%r.get(%r) waiting for uncached file to arrive', self, path)
latch.get()
LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path])
return self._cache[path]
def on_fork():
"""
Should be called by any program integrating Mitogen each time the process
is forked, in the context of the new child.
"""
reset_logging_framework() # Must be first!
fixup_prngs()
mitogen.core.Latch._on_fork()
mitogen.core.Side._on_fork()
mitogen.core.ExternalContext.service_stub_lock = threading.Lock()
mitogen__service = sys.modules.get('mitogen.service')
if mitogen__service:
mitogen__service._pool_lock = threading.Lock()