Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_all_can_be_called_multiple_times(futures):
futures[0].set(0)
futures[1].set(1)
futures[2].set(2)
result1 = get_all(futures)
result2 = get_all(futures)
assert result1 == result2
def test_get_all_raises_timeout_if_not_all_futures_are_available(futures):
futures[0].set(0)
futures[1].set(1)
# futures[2] is unset
with pytest.raises(Timeout):
get_all(futures, timeout=0)
def update(self, async=False):
with self.lock:
self.cache.clear()
with DebugTimer(logger, 'Updating podcast directories'):
futures = [p.update() for p in self.proxies.values()]
return None if async else pykka.get_all(futures)
tl_track = context.core.playback.get_current_tl_track()
futures = {
'tracklist.length': context.core.tracklist.get_length(),
'tracklist.version': context.core.tracklist.get_version(),
'mixer.volume': context.core.mixer.get_volume(),
'tracklist.consume': context.core.tracklist.get_consume(),
'tracklist.random': context.core.tracklist.get_random(),
'tracklist.repeat': context.core.tracklist.get_repeat(),
'tracklist.single': context.core.tracklist.get_single(),
'playback.state': context.core.playback.get_state(),
'playback.current_tl_track': tl_track,
'tracklist.index': context.core.tracklist.index(tl_track.get()),
'playback.time_position': context.core.playback.get_time_position(),
}
pykka.get_all(futures.values())
result = [
('volume', _status_volume(futures)),
('repeat', _status_repeat(futures)),
('random', _status_random(futures)),
('single', _status_single(futures)),
('consume', _status_consume(futures)),
('playlist', _status_playlist_version(futures)),
('playlistlength', _status_playlist_length(futures)),
('xfade', _status_xfade(futures)),
('state', _status_state(futures)),
]
# TODO: add nextsong and nextsongid
if futures['playback.current_tl_track'].get() is not None:
result.append(('song', _status_songpos(futures)))
result.append(('songid', _status_songid(futures)))
if futures['playback.state'].get() in (
def refresh(self, uri=None, async=False):
if uri and uri != self.root_uri:
_, uriref, proxy = self._lookup(uri)
futures = [proxy.refresh(uriref)]
else:
futures = [p.refresh() for p in self._proxies.values()]
if not async:
pykka.get_all(futures)
def run(pool_size, *ips):
# Start resolvers
resolvers = [Resolver.start().proxy() for _ in range(pool_size)]
# Distribute work by mapping IPs to resolvers (not blocking)
hosts = []
for i, ip in enumerate(ips):
hosts.append(resolvers[i % len(resolvers)].resolve(ip))
# Gather results (blocking)
ip_to_host = zip(ips, pykka.get_all(hosts))
pprint.pprint(list(ip_to_host))
# Clean up
pykka.ActorRegistry.stop_all()
'tracklist.length': context.core.tracklist.get_length(),
'tracklist.version': context.core.tracklist.get_version(),
'mixer.volume': context.core.mixer.get_volume(),
'tracklist.consume': context.core.tracklist.get_consume(),
'tracklist.random': context.core.tracklist.get_random(),
'tracklist.repeat': context.core.tracklist.get_repeat(),
'tracklist.single': context.core.tracklist.get_single(),
'playback.state': context.core.playback.get_state(),
'playback.current_tl_track': tl_track,
'tracklist.index': context.core.tracklist.index(tl_track.get()),
'tracklist.next_tlid': next_tlid,
'tracklist.next_index': context.core.tracklist.index(
tlid=next_tlid.get()),
'playback.time_position': context.core.playback.get_time_position(),
}
pykka.get_all(futures.values())
result = [
('volume', _status_volume(futures)),
('repeat', _status_repeat(futures)),
('random', _status_random(futures)),
('single', _status_single(futures)),
('consume', _status_consume(futures)),
('playlist', _status_playlist_version(futures)),
('playlistlength', _status_playlist_length(futures)),
('xfade', _status_xfade(futures)),
('state', _status_state(futures)),
]
if futures['playback.current_tl_track'].get() is not None:
result.append(('song', _status_songpos(futures)))
result.append(('songid', _status_songid(futures)))
if futures['tracklist.next_tlid'].get() is not None:
result.append(('nextsong', _status_nextsongpos(futures)))
@property
def root_directories(self):
futures = [d.root_directory for d in self.proxies.values()]
return [ref for ref in pykka.get_all(futures) if ref]