Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"position=%(position)s",
{
"rate": segment.rate,
"format": Gst.Format.get_name(segment.format),
"start": segment.start,
"stop": segment.stop,
"position": segment.position,
},
)
position_ms = segment.position // Gst.MSECOND
logger.debug("Audio event: position_changed(position=%r)", position_ms)
AudioListener.send("position_changed", position=position_ms)
# TODO: create a player class which replaces the actors internals
class Audio(pykka.ThreadingActor):
"""
Audio output through `GStreamer `_.
"""
#: The GStreamer state mapped to :class:`mopidy.audio.PlaybackState`
state = PlaybackState.STOPPED
#: The software mixing interface :class:`mopidy.audio.actor.SoftwareMixer`
mixer = None
def __init__(self, config, mixer):
super().__init__()
self._config = config
self._target_state = Gst.State.NULL
def time_it(func):
start = time.time()
func()
print('{!r} took {:.3f}s'.format(func.__name__, time.time() - start))
class SomeObject(object):
pykka_traversable = False
cat = 'bar.cat'
def func(self):
pass
class AnActor(ThreadingActor):
bar = SomeObject()
bar.pykka_traversable = True
foo = 'foo'
def __init__(self):
super(AnActor, self).__init__()
self.cat = 'quox'
def func(self):
pass
def test_direct_plain_attribute_access():
actor = AnActor.start().proxy()
for _ in range(10000):
used in tests of the frontends.
"""
from __future__ import absolute_import, unicode_literals
from mopidy import backend
from mopidy.models import Playlist, Ref, SearchResult
import pykka
def create_proxy(config=None, audio=None):
return DummyBackend.start(config=config, audio=audio).proxy()
class DummyBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio):
super(DummyBackend, self).__init__()
self.library = DummyLibraryProvider(backend=self)
if audio:
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
else:
self.playback = DummyPlaybackProvider(audio=audio, backend=self)
self.playlists = DummyPlaylistsProvider(backend=self)
self.uri_schemes = ['dummy']
class DummyLibraryProvider(backend.LibraryProvider):
root_directory = Ref.directory(uri='dummy:/', name='dummy')
import pykka
from . import Extension
from .directory import PodcastDirectory
from .models import Ref
from .timers import DebugTimer
from .uritools import urisplit, uriunsplit
logger = logging.getLogger(__name__)
def _transform(base, ref):
return ref.copy(uri=base.transform(ref.uri, strict=True).geturi())
class PodcastDirectoryActor(pykka.ThreadingActor):
def __init__(self, directory, config, timeout):
super(PodcastDirectoryActor, self).__init__()
self.directory = directory(config, timeout)
self.root_directory = Ref.directory(
uri=uriunsplit([None, self.directory.name, '/', None, None]),
name=self.directory.display_name or self.directory.name
)
def get(self, uri):
with DebugTimer(logger, 'Getting %s from %s' % (uri, self.directory)):
return self.directory.get(uri)
def browse(self, uri, limit=None):
with DebugTimer(logger, 'Browsing %s in %s' % (uri, self.directory)):
return self.directory.browse(uri, limit)
import logging
from mopidy import backend
import pykka
from .library import BeetsLocalLibraryProvider
logger = logging.getLogger(__name__)
class BeetsLocalBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio):
super(BeetsLocalBackend, self).__init__()
self.beetslibrary = config['beetslocal']['beetslibrary']
self.use_original_release_date = config['beetslocal'][
'use_original_release_date']
logger.debug("Got library %s" % (self.beetslibrary))
self.playback = BeetsLocalPlaybackProvider(audio=audio, backend=self)
self.library = BeetsLocalLibraryProvider(backend=self)
self.playlists = None
self.uri_schemes = ['beetslocal']
def _extract_uri(self, uri):
logger.debug("convert uri = %s" % uri.encode('ascii', 'ignore'))
if not uri.startswith('beetslocal:'):
raise ValueError('Invalid URI.')
from mopidy import audio, core
import pykka
import logging
import threading
import os
import time
from .ws_client import WSClient
from mopidy_jellyfin import Extension
logger = logging.getLogger(__name__)
class EventMonitorFrontend(
pykka.ThreadingActor,
core.CoreListener,
audio.AudioListener
):
# Sends events and playback updates back to Jellyfin server
def __init__(self, config, core):
super(EventMonitorFrontend, self).__init__()
self.core = core
self.token = self._read_token(config)
self.config = config
self.hostname = self.config['jellyfin'].get('hostname')
self.wsc = WSClient(self)
response_url = self.wsc.http.check_redirect(self.hostname)
if self.hostname != response_url:
self.hostname = response_url
from __future__ import unicode_literals
from mopidy.core import CoreListener
import mem
import pykka
import logging
# import logger
logger = logging.getLogger(__name__)
class IrisFrontend(pykka.ThreadingActor, CoreListener):
def __init__(self, config, core):
super(IrisFrontend, self).__init__()
mem.iris.core = core
mem.iris.config = config
def on_start(self):
mem.iris.start()
def on_stop(self):
mem.iris.stop()
def track_playback_ended(self, tl_track, time_position):
mem.iris.check_for_radio_update()
def tracklist_changed(self):
import logging
from mopidy import backend
import pykka
from mopidy_emby.library import EmbyLibraryProvider
from mopidy_emby.playback import EmbyPlaybackProvider
from mopidy_emby.remote import EmbyHandler
logger = logging.getLogger(__name__)
class EmbyBackend(pykka.ThreadingActor, backend.Backend):
uri_schemes = ['emby']
def __init__(self, config, audio):
super(EmbyBackend, self).__init__()
self.library = EmbyLibraryProvider(backend=self)
self.playback = EmbyPlaybackProvider(audio=audio, backend=self)
self.playlist = None
self.remote = EmbyHandler(config)
def _format_test_result(mutation_record, test_result):
"""Returns a reasonably formatted string with test outcome,
activation-record information, and reason.
"""
arec = mutation_record.activation_record
return '{outcome} -> {desc} @ {filename}:{lineno}\n{reason}'.format(
outcome=test_result.outcome,
desc=arec.get('description', ''),
filename=mutation_record.module_file,
lineno=arec.get('line_number', ''),
reason=test_result.results)
class QueueManager(pykka.ThreadingActor):
"""An actor that distributes the mutation records to testers.
"""
def __init__(self, mutation_records):
super().__init__() # pylint:disable=missing-super-argument
self._record_iterator = iter(mutation_records)
def next(self):
"""Return the next mutation record, or `None` if there are no more
records.
"""
try:
return next(self._record_iterator)
except StopIteration:
return None
from __future__ import unicode_literals
import logging
import pykka
from agent import EvtDevAgent
logger = logging.getLogger(__name__)
class EvtDevFrontend(pykka.ThreadingActor):
def __init__(self, config, core):
super(EvtDevFrontend, self).__init__()
dev_dir = config['evtdev']['dev_dir']
devices = config['evtdev']['devices']
vol_step_size = config['evtdev']['vol_step_size']
refresh = config['evtdev']['refresh']
# EvtDevAgent performs all the handling of device
# key presses on our behalf
self.agent = EvtDevAgent(core, dev_dir, devices,
vol_step_size, refresh)
logger.info('EvtDevAgent started')
def on_stop(self):
"""