Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# - load the tools
#
if tools:
tools = {tool.tag: tool for tool in [clz() for clz in tools if issubclass(clz, Tool)] if tool.tag}
logger.info('supporting tools %s' % ', '.join(tools.keys()))
#
# - start the life-cycle actor which will pass our hints (as a json object) to its underlying sub-process
# - start our coordinator which will connect to zookeeper and attempt to lead the cluster
# - upon grabbing the lock the model actor will start and implement the configuration process
# - the hints are a convenient bag for any data that may change at runtime and needs to be returned (via
# the HTTP POST /info request)
# - what's being registered in zookeeper is immutable though and decorated with additional details by
# the coordinator (especially the pod index which is derived from zookeeper)
#
latch = ThreadingFuture()
logger.info('starting %s.%s (marathon) @ %s' % (hints['namespace'], hints['cluster'], hints['node']))
breadcrumbs = deepcopy(hints)
hints['metrics'] = {}
hints['dependencies'] = model.depends_on
env.update({'ochopod': json.dumps(hints)})
executor = lifecycle.start(env, latch, hints)
coordinator = Coordinator.start(
hints['zk'].split(','),
hints['namespace'],
hints['cluster'],
int(hints['port']),
breadcrumbs,
model,
hints)
#
uris = query.get('uri', [])
if uris:
tracks = []
for uri in uris:
tracks += self.lookup(uri)
if len(uris) == 1:
uri = uris[0]
else:
uri = 'spotify:search'
return SearchResult(uri=uri, tracks=tracks)
spotify_query = self._translate_search_query(query)
logger.debug('Spotify search query: %s' % spotify_query)
future = pykka.ThreadingFuture()
def callback(results, userdata=None):
search_result = SearchResult(
uri='spotify:search:%s' % (
urllib.quote(results.query().encode('utf-8'))),
albums=[
translator.to_mopidy_album(a) for a in results.albums()],
artists=[
translator.to_mopidy_artist(a) for a in results.artists()],
tracks=[
translator.to_mopidy_track(t) for t in results.tracks()])
future.set(search_result)
if not self.backend.spotify.connected.is_set():
logger.debug('Not connected: Spotify search cancelled')
return SearchResult(uri='spotify:search')
def _control(task, timeout='60'):
logger.debug('http in -> /control/%s' % task)
if task not in ['check', 'on', 'off', 'ok', 'kill', 'signal']:
#
# - fail on a HTTP 400 if the request is not supported
#
return '{}', 400, {'Content-Type': 'application/json; charset=utf-8'}
try:
ts = time.time()
latch = ThreadingFuture()
executor.tell({'request': task, 'latch': latch, 'data': request.data})
js, code = latch.get(timeout=int(timeout))
ms = time.time() - ts
logger.debug('http out -> HTTP %s (%d ms)' % (code, ms))
return json.dumps(js), code, {'Content-Type': 'application/json; charset=utf-8'}
except Timeout:
#
# - we failed to match the specified timeout
# - gracefully fail on a HTTP 408
#
return '{}', 408, {'Content-Type': 'application/json; charset=utf-8'}
except ActorDeadError:
def run(proxy, func, timeout=60.0):
"""
Helper asking the zookeeper proxy actor to run the specified closure and blocking until either the timeout is
reached or a response is received.
:type proxy: string
:type func: callable
:type timeout: float
:param proxy: our ancillary zookeeper proxy actor
:param func: the closure to run within the proxy actor
:param timeout: optional timeout in seconds
:rtype: dict
"""
try:
latch = pykka.ThreadingFuture()
proxy.tell(
{
'request': 'execute',
'latch': latch,
'function': func
})
Event()
out = latch.get(timeout=timeout)
if isinstance(out, Exception):
raise out
return out
except Timeout:
assert 0, 'request timeout'
def _request(self, tokens):
#
# - we use this help to schedule commands internally (mostly used to switch
# the pod on/off)
#
for token in tokens:
self.commands.append((token, {}, ThreadingFuture()))
def videos(self):
"""
loads the list of videos of a playlist using one API call for every 50
fetched videos. For every page fetched, Video.load_info is called to
start loading video info in a separate thread.
"""
self._videos = pykka.ThreadingFuture()
def job():
data = {"items": []}
page = ""
while (
page is not None
and len(data["items"]) < self.playlist_max_videos
):
try:
max_results = min(
int(self.playlist_max_videos) - len(data["items"]), 50
)
result = self.api.list_playlistitems(
self.id, page, max_results
)
except Exception as e:
if hasattr(data, 'lock') and data.lock:
try:
data.lock.release()
except ConnectionClosedError:
pass
data.lock = None
lock.acquire(timeout=SAMPLING)
logger.debug('%s : lock acquired @ %s, now leading' % (self.path, self.prefix))
data.lock = lock
#
# - we have the lock (e.g we are the leader)
# - start the controller actor
#
data.latch = ThreadingFuture()
data.controller = self.model.start(data.zk, self.id, self.hints, self.scope, self.tag, self.port, data.latch)
return 'lock', data, 0
except LockTimeout:
pass
#
# - we could not obtain the lock
# - blindly loop back and attempt to get it again
#
return 'spin', data, 0
def _set_api_data(self, fields, item):
"""
sets the given 'fields' of 'self', based on the 'item'
data retrieved through the API
"""
for k in fields:
_k = "_" + k
future = self.__dict__.get(_k)
if not future:
future = self.__dict__[_k] = pykka.ThreadingFuture()
if not future._queue.empty(): # hack, no public is_set()
continue
if not item:
val = None
elif k == "title":
val = item["snippet"]["title"]
elif k == "channel":
val = item["snippet"]["channelTitle"]
elif k == "length":
# convert PT1H2M10S to 3730
m = re.search(
r"P((?P\d+)W)?"
+ r"((?P\d+)D)?"
+ r"T((?P\d+)H)?"
def shutdown(actor_ref, timeout=None):
"""
Shuts a state-machine down and wait for it to acknowledge it's down using a latch.
:type actor_ref: :class:`pykka.ActorRef`
:param actor_ref: a pykka actor reference
:type timeout: float
:param timeout: optional timeout in seconds
"""
try:
if not actor_ref:
return
latch = ThreadingFuture()
actor_ref.tell({'request': 'shutdown', 'latch': latch})
Event()
latch.get(timeout=timeout)
except Timeout:
pass
except ActorDeadError:
pass