Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *sources: Event):
JoinOp.__init__(self)
self._sources = deque()
self._parent = None
self._set_sources(*sources)
def __init__(self, *sources):
JoinOp.__init__(self)
self._num_ready = 0 # number of sources with a pending result
self._source2cbs = defaultdict(list) # map from source to callbacks
if sources:
self._set_sources(*sources)
def __init__(self, *sources, partial=True):
JoinOp.__init__(self)
self._is_primed = partial
self._source2cbs = defaultdict(list) # map from source to callbacks
if sources:
self._set_sources(*sources)