Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(self):
self._pa_state_cb = c.PA_STATE_CB_T(self._pulse_state_cb)
self._pa_subscribe_cb = c.PA_SUBSCRIBE_CB_T(self._pulse_subscribe_cb)
self._loop, self._loop_lock = c.pa.mainloop_new(), FakeLock()
self._loop_running = self._loop_closed = False
self._api = c.pa.mainloop_get_api(self._loop)
self._ret = c.pa.return_value()
self._ctx_init()
self.event_types = sorted(PulseEventTypeEnum._values.values())
self.event_facilities = sorted(PulseEventFacilityEnum._values.values())
self.event_masks = sorted(PulseEventMaskEnum._values.values())
self.event_callback = None
def connect(self, autospawn=False, wait=False):
'''Connect to pulseaudio server.
"autospawn" option will start new pulse daemon, if necessary.
Specifying "wait" option will make function block until pulseaudio server appears.'''
if self._loop_closed:
raise PulseError('Eventloop object was already'
' destroyed and cannot be reused from this instance.')
if self.connected is not None: self._ctx_init()
flags, self.connected = 0, None
if not autospawn: flags |= c.PA_CONTEXT_NOAUTOSPAWN
if wait: flags |= c.PA_CONTEXT_NOFAIL
try: c.pa.context_connect(self._ctx, self.server, flags, None)
except c.pa.CallError: self.connected = False
while self.connected is None: self._pulse_iterate()
if self.connected is False: raise PulseError('Failed to connect to pulseaudio server')
def read_cb(s, bs, userdata):
buff, bs = c.c_void_p(), c.c_int(bs)
c.pa.stream_peek(s, buff, c.byref(bs))
try:
if not buff or bs.value < 4: return
# This assumes that native byte order for floats is BE, same as pavucontrol
samples[0] = max(samples[0], c.cast(buff, c.POINTER(c.c_float))[0])
finally:
# stream_drop() flushes buffered data (incl. buff=NULL "hole" data)
# stream.h: "should not be called if the buffer is empty"
if bs.value: c.pa.stream_drop(s)
def _wrapper(self, *args, **kws):
if index_arg:
if 'index' in kws: index = kws.pop('index')
else: index, args = args[0], args[1:]
pulse_args = func(*args, **kws) if func else list()
if not is_list(pulse_args): pulse_args = [pulse_args]
if index_arg: pulse_args = [index] + list(pulse_args)
with self._pulse_op_cb() as cb:
try: pulse_op(self._ctx, *(list(pulse_args) + [cb, None]))
except c.ArgumentError as err: raise TypeError(err.args)
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
func_args = list(inspect.getargspec(func or (lambda: None)))