Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(self):
self._pa_state_cb = c.PA_STATE_CB_T(self._pulse_state_cb)
self._pa_subscribe_cb = c.PA_SUBSCRIBE_CB_T(self._pulse_subscribe_cb)
self._loop, self._loop_lock = c.pa.mainloop_new(), FakeLock()
self._loop_running = self._loop_closed = False
self._api = c.pa.mainloop_get_api(self._loop)
self._ret = c.pa.return_value()
self._ctx_init()
self.event_types = sorted(PulseEventTypeEnum._values.values())
self.event_facilities = sorted(PulseEventFacilityEnum._values.values())
self.event_masks = sorted(PulseEventMaskEnum._values.values())
self.event_callback = None
@c.PA_STREAM_REQUEST_CB_T
def read_cb(s, bs, userdata):
buff, bs = c.c_void_p(), c.c_int(bs)
c.pa.stream_peek(s, buff, c.byref(bs))
try:
if not buff or bs.value < 4: return
# This assumes that native byte order for floats is BE, same as pavucontrol
samples[0] = max(samples[0], c.cast(buff, c.POINTER(c.c_float))[0])
finally:
# stream_drop() flushes buffered data (incl. buff=NULL "hole" data)
# stream.h: "should not be called if the buffer is empty"
if bs.value: c.pa.stream_drop(s)
if stream_idx is not None: c.pa.stream_set_monitor_stream(s, stream_idx)
c.pa.stream_set_read_callback(s, read_cb, None)
if source is not None: source = unicode(source).encode('utf-8')
try:
c.pa.stream_connect_record( s, source,
c.PA_BUFFER_ATTR(fragsize=4, maxlength=2**32-1),
c.PA_STREAM_DONT_MOVE | c.PA_STREAM_PEAK_DETECT |
c.PA_STREAM_ADJUST_LATENCY | c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND )
except c.pa.CallError:
c.pa.stream_unref(s)
raise
try: self._pulse_poll(timeout)
finally:
try: c.pa.stream_disconnect(s)
except c.pa.CallError: pass # stream was removed
return min(1.0, samples[0])
sink_mute = _pulse_method_call(
c.pa.context_set_sink_mute_by_index, lambda mute=True: mute )
sink_input_volume_set = _pulse_method_call(
c.pa.context_set_sink_input_volume, lambda vol: vol.to_struct() )
sink_volume_set = _pulse_method_call(
c.pa.context_set_sink_volume_by_index, lambda vol: vol.to_struct() )
sink_suspend = _pulse_method_call(
c.pa.context_suspend_sink_by_index, lambda suspend=True: suspend )
sink_port_set = _pulse_method_call(
c.pa.context_set_sink_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
source_output_mute = _pulse_method_call(
c.pa.context_set_source_output_mute, lambda mute=True: mute )
source_output_move = _pulse_method_call(
c.pa.context_move_source_output_by_index, lambda sink_index: sink_index )
source_mute = _pulse_method_call(
c.pa.context_set_source_mute_by_index, lambda mute=True: mute )
source_output_volume_set = _pulse_method_call(
c.pa.context_set_source_output_volume, lambda vol: vol.to_struct() )
source_volume_set = _pulse_method_call(
c.pa.context_set_source_volume_by_index, lambda vol: vol.to_struct() )
source_suspend = _pulse_method_call(
c.pa.context_suspend_source_by_index, lambda suspend=True: suspend )
source_port_set = _pulse_method_call(
c.pa.context_set_source_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
def module_load(self, name, args=''):
if is_list(args): args = ' '.join(args)
name, args = map(c.force_bytes, [name, args])
sink_input_list = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info_list, PulseSinkInputInfo )
sink_input_info = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info, PulseSinkInputInfo )
source_output_list = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info_list, PulseSourceOutputInfo )
source_output_info = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info, PulseSourceOutputInfo )
sink_list = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_list, PulseSinkInfo )
sink_info = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_by_index, PulseSinkInfo )
source_list = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_list, PulseSourceInfo )
source_info = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_by_index, PulseSourceInfo )
card_list = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_list, PulseCardInfo )
card_info = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_by_index, PulseCardInfo )
client_list = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info_list, PulseClientInfo )
client_info = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info, PulseClientInfo )
server_info = _pulse_get_list(
c.PA_SERVER_INFO_CB_T, c.pa.context_get_server_info, PulseServerInfo, singleton=True )
card_profile_set_by_index = _pulse_method_call(
c.pa.context_set_card_profile_by_index, lambda profile_name: profile_name )
sink_default_set = _pulse_method_call(
c.pa.context_set_default_sink, index_arg=False,
func=lambda sink: sink.name if isinstance(sink, PulseSinkInfo) else sink )
source_default_set = _pulse_method_call(
c.pa.context_set_default_source, index_arg=False,
func=lambda source: source.name if isinstance(source, PulseSourceInfo) else source )
sink_input_mute = _pulse_method_call(
c.pa.context_set_sink_input_mute, lambda mute=True: mute )
sink_input_move = _pulse_method_call(
c.pa.context_move_sink_input_by_index, lambda sink_index: sink_index )
sink_mute = _pulse_method_call(
c.pa.context_set_sink_mute_by_index, lambda mute=True: mute )
sink_input_volume_set = _pulse_method_call(
c.pa.context_set_sink_input_volume, lambda vol: vol.to_struct() )
sink_volume_set = _pulse_method_call(
c.pa.context_set_sink_volume_by_index, lambda vol: vol.to_struct() )
sink_suspend = _pulse_method_call(
c.pa.context_suspend_sink_by_index, lambda suspend=True: suspend )
sink_port_set = _pulse_method_call(
c.pa.context_set_sink_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
source_output_mute = _pulse_method_call(
c.pa.context_set_source_output_mute, lambda mute=True: mute )
source_output_move = _pulse_method_call(
c.pa.context_move_source_output_by_index, lambda sink_index: sink_index )
source_mute = _pulse_method_call(
c.pa.context_set_source_mute_by_index, lambda mute=True: mute )
source_default_set = _pulse_method_call(
c.pa.context_set_default_source, index_arg=False,
func=lambda source: source.name if isinstance(source, PulseSourceInfo) else source )
sink_input_mute = _pulse_method_call(
c.pa.context_set_sink_input_mute, lambda mute=True: mute )
sink_input_move = _pulse_method_call(
c.pa.context_move_sink_input_by_index, lambda sink_index: sink_index )
sink_mute = _pulse_method_call(
c.pa.context_set_sink_mute_by_index, lambda mute=True: mute )
sink_input_volume_set = _pulse_method_call(
c.pa.context_set_sink_input_volume, lambda vol: vol.to_struct() )
sink_volume_set = _pulse_method_call(
c.pa.context_set_sink_volume_by_index, lambda vol: vol.to_struct() )
sink_suspend = _pulse_method_call(
c.pa.context_suspend_sink_by_index, lambda suspend=True: suspend )
sink_port_set = _pulse_method_call(
c.pa.context_set_sink_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
source_output_mute = _pulse_method_call(
c.pa.context_set_source_output_mute, lambda mute=True: mute )
source_output_move = _pulse_method_call(
c.pa.context_move_source_output_by_index, lambda sink_index: sink_index )
source_mute = _pulse_method_call(
c.pa.context_set_source_mute_by_index, lambda mute=True: mute )
source_output_volume_set = _pulse_method_call(
c.pa.context_set_source_output_volume, lambda vol: vol.to_struct() )
source_volume_set = _pulse_method_call(
c.pa.context_set_source_volume_by_index, lambda vol: vol.to_struct() )
source_suspend = _pulse_method_call(
c.pa.context_suspend_source_by_index, lambda suspend=True: suspend )
'' if pulse_func.__name__.endswith('_list') or singleton or not index_arg else 'index' )
return _wrapper_method
get_sink_by_name = _pulse_get_list(
c.PA_SINK_INFO_CB_T,
c.pa.context_get_sink_info_by_name, PulseSinkInfo )
get_source_by_name = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T,
c.pa.context_get_source_info_by_name, PulseSourceInfo )
get_card_by_name = _pulse_get_list(
c.PA_CARD_INFO_CB_T,
c.pa.context_get_card_info_by_name, PulseCardInfo )
sink_input_list = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info_list, PulseSinkInputInfo )
sink_input_info = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info, PulseSinkInputInfo )
source_output_list = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info_list, PulseSourceOutputInfo )
source_output_info = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info, PulseSourceOutputInfo )
sink_list = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_list, PulseSinkInfo )
sink_info = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_by_index, PulseSinkInfo )
source_list = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_list, PulseSourceInfo )
def module_load(self, name, args=''):
if is_list(args): args = ' '.join(args)
name, args = map(c.force_bytes, [name, args])
data = list()
with self._pulse_op_cb(raw=True) as cb:
cb = c.PA_CONTEXT_INDEX_CB_T(
lambda ctx, index, userdata, cb=cb: data.append(index) or cb() )
try: c.pa.context_load_module(self._ctx, name, args, cb, None)
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
index, = data
return index
source_list = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_list, PulseSourceInfo )
source_info = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_by_index, PulseSourceInfo )
card_list = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_list, PulseCardInfo )
card_info = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_by_index, PulseCardInfo )
client_list = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info_list, PulseClientInfo )
client_info = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info, PulseClientInfo )
server_info = _pulse_get_list(
c.PA_SERVER_INFO_CB_T, c.pa.context_get_server_info, PulseServerInfo, singleton=True )
module_info = _pulse_get_list(
c.PA_MODULE_INFO_CB_T, c.pa.context_get_module_info, PulseModuleInfo )
module_list = _pulse_get_list(
c.PA_MODULE_INFO_CB_T, c.pa.context_get_module_info_list, PulseModuleInfo )
def _pulse_method_call(pulse_op, func=None, index_arg=True):
'''Creates following synchronous wrapper for async pa_operation callable:
wrapper(index, ...) -> pulse_op(index, [*]args_func(...))
index_arg=False: wrapper(...) -> pulse_op([*]args_func(...))'''
def _wrapper(self, *args, **kws):
if index_arg:
if 'index' in kws: index = kws.pop('index')
else: index, args = args[0], args[1:]
pulse_args = func(*args, **kws) if func else list()
if not is_list(pulse_args): pulse_args = [pulse_args]
if index_arg: pulse_args = [index] + list(pulse_args)
with self._pulse_op_cb() as cb: