Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pykka.actor import ThreadingActor
from pykka.registry import ActorRegistry
from mopidy.mixers.base import BaseMixer
from mopidy.gstreamer import GStreamer
class GStreamerSoftwareMixer(ThreadingActor, BaseMixer):
"""Mixer which uses GStreamer to control volume in software."""
def __init__(self):
super(GStreamerSoftwareMixer, self).__init__()
self.output = None
def on_start(self):
output_refs = ActorRegistry.get_by_class(GStreamer)
assert len(output_refs) == 1, 'Expected exactly one running output.'
self.output = output_refs[0].proxy()
def get_volume(self):
return self.output.get_volume().get()
def set_volume(self, volume):
self.output.set_volume(volume).get()
from pykka.actor import ThreadingActor
from mopidy.mixers.base import BaseMixer
class DummyMixer(ThreadingActor, BaseMixer):
"""Mixer which just stores and reports the chosen volume."""
def __init__(self):
super(DummyMixer, self).__init__()
self._volume = None
def get_volume(self):
return self._volume
def set_volume(self, volume):
self._volume = volume
import logging
from pykka.actor import ThreadingActor
from mopidy import settings
from mopidy.mixers.base import BaseMixer
logger = logging.getLogger(u'mopidy.mixers.denon')
class DenonMixer(ThreadingActor, BaseMixer):
"""
Mixer for controlling Denon amplifiers and receivers using the RS-232
protocol.
The external mixer is the authoritative source for the current volume.
This allows the user to use his remote control the volume without Mopidy
cancelling the volume setting.
**Dependencies**
- pyserial (python-serial on Debian/Ubuntu)
**Settings**
- :attr:`mopidy.settings.MIXER_EXT_PORT` -- Example: ``/dev/ttyUSB0``
"""
from subprocess import Popen, PIPE
import time
from pykka.actor import ThreadingActor
from mopidy.mixers.base import BaseMixer
class OsaMixer(ThreadingActor, BaseMixer):
"""
Mixer which uses ``osascript`` on OS X to control volume.
**Dependencies:**
- None
**Settings:**
- None
"""
CACHE_TTL = 30
_cache = None
_last_update = None
from pykka.actor import ThreadingActor
from mopidy.outputs.base import BaseOutput
class DummyOutput(ThreadingActor, BaseOutput):
"""
Audio output used for testing.
"""
# pylint: disable = R0902
# Too many instance attributes (9/7)
#: For testing. Contains the last URI passed to :meth:`play_uri`.
uri = None
#: For testing. Contains the last capabilities passed to
#: :meth:`deliver_data`.
capabilities = None
#: For testing. Contains the last data passed to :meth:`deliver_data`.
data = None
import alsaaudio
import logging
from pykka.actor import ThreadingActor
from mopidy import settings
from mopidy.mixers.base import BaseMixer
logger = logging.getLogger('mopidy.mixers.alsa')
class AlsaMixer(ThreadingActor, BaseMixer):
"""
Mixer which uses the Advanced Linux Sound Architecture (ALSA) to control
volume.
**Dependencies:**
- pyalsaaudio >= 0.2 (python-alsaaudio on Debian/Ubuntu)
**Settings:**
- :attr:`mopidy.settings.MIXER_ALSA_CONTROL`
"""
def __init__(self):
super(AlsaMixer, self).__init__()
self._mixer = None