Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connect(self, autospawn=False, wait=False):
'''Connect to pulseaudio server.
"autospawn" option will start new pulse daemon, if necessary.
Specifying "wait" option will make function block until pulseaudio server appears.'''
if self._loop_closed:
raise PulseError('Eventloop object was already'
' destroyed and cannot be reused from this instance.')
if self.connected is not None: self._ctx_init()
flags, self.connected = 0, None
if not autospawn: flags |= c.PA_CONTEXT_NOAUTOSPAWN
if wait: flags |= c.PA_CONTEXT_NOFAIL
try: c.pa.context_connect(self._ctx, self.server, flags, None)
except c.pa.CallError: self.connected = False
while self.connected is None: self._pulse_iterate()
if self.connected is False: raise PulseError('Failed to connect to pulseaudio server')
def connect(self, autospawn=False, wait=False):
'''Connect to pulseaudio server.
"autospawn" option will start new pulse daemon, if necessary.
Specifying "wait" option will make function block until pulseaudio server appears.'''
if self._loop_closed:
raise PulseError('Eventloop object was already'
' destroyed and cannot be reused from this instance.')
if self.connected is not None: self._ctx_init()
flags, self.connected = 0, None
if not autospawn: flags |= c.PA_CONTEXT_NOAUTOSPAWN
if wait: flags |= c.PA_CONTEXT_NOFAIL
try: c.pa.context_connect(self._ctx, self.server, flags, None)
except c.pa.CallError: self.connected = False
while self.connected is None: self._pulse_iterate()
if self.connected is False: raise PulseError('Failed to connect to pulseaudio server')
PulseEventTypeEnum = Enum('event-type', c.PA_EVENT_TYPE_MAP)
PulseEventFacilityEnum = Enum('event-facility', c.PA_EVENT_FACILITY_MAP)
PulseEventMaskEnum = Enum('event-mask', c.PA_EVENT_MASK_MAP)
PulseStateEnum = Enum('sink/source-state', c.PA_OBJ_STATE_MAP)
PulseUpdateEnum = Enum('update-type', c.PA_UPDATE_MAP)
PulsePortAvailableEnum = Enum('available', c.PA_PORT_AVAILABLE_MAP)
PulseDirectionEnum = Enum('direction', c.PA_DIRECTION_MAP)
class PulseError(Exception): pass
class PulseOperationFailed(PulseError): pass
class PulseOperationInvalid(PulseOperationFailed): pass
class PulseIndexError(PulseError): pass
class PulseLoopStop(Exception): pass
class PulseDisconnected(Exception): pass
class PulseObject(object):
c_struct_wrappers = dict()
def __init__(self, struct=None, *field_data_list, **field_data_dict):
field_data, fields = dict(), getattr(self, 'c_struct_fields', list())
if is_str_native(fields): fields = self.c_struct_fields = fields.split()
if field_data_list: field_data.update(zip(fields, field_data_list))
if field_data_dict: field_data.update(field_data_dict)
if struct is None: field_data, struct = dict(), field_data
assert not set(field_data.keys()).difference(fields)
if field_data: self._copy_struct_fields(field_data, fields=field_data.keys())
into a mutex lock, and should only be needed if same-instance methods
will/should/might be called from different threads at the same time.'''
self.name = client_name or 'pulsectl'
self.server, self.connected = server, None
self._ret = self._ctx = self._loop = self._api = None
self._actions, self._action_ids = dict(),\
it.chain.from_iterable(map(range, it.repeat(2**30)))
self.init()
if threading_lock:
if threading_lock is True:
import threading
threading_lock = threading.Lock()
self._loop_lock = threading_lock
if connect:
try: self.connect(autospawn=True)
except PulseError:
self.close()
raise
def __repr__(self):
return ''.format(self._name, ' '.join(sorted(self._values.keys())))
PulseEventTypeEnum = Enum('event-type', c.PA_EVENT_TYPE_MAP)
PulseEventFacilityEnum = Enum('event-facility', c.PA_EVENT_FACILITY_MAP)
PulseEventMaskEnum = Enum('event-mask', c.PA_EVENT_MASK_MAP)
PulseStateEnum = Enum('sink/source-state', c.PA_OBJ_STATE_MAP)
PulseUpdateEnum = Enum('update-type', c.PA_UPDATE_MAP)
PulsePortAvailableEnum = Enum('available', c.PA_PORT_AVAILABLE_MAP)
PulseDirectionEnum = Enum('direction', c.PA_DIRECTION_MAP)
class PulseError(Exception): pass
class PulseOperationFailed(PulseError): pass
class PulseOperationInvalid(PulseOperationFailed): pass
class PulseIndexError(PulseError): pass
class PulseLoopStop(Exception): pass
class PulseDisconnected(Exception): pass
class PulseObject(object):
c_struct_wrappers = dict()
def __init__(self, struct=None, *field_data_list, **field_data_dict):
field_data, fields = dict(), getattr(self, 'c_struct_fields', list())
if is_str_native(fields): fields = self.c_struct_fields = fields.split()
if field_data_list: field_data.update(zip(fields, field_data_list))
if field_data_dict: field_data.update(field_data_dict)
if struct is None: field_data, struct = dict(), field_data
import socket, errno, signal, time
s, n = None, attempts if attempts > 0 else None
try:
pid_path, sock_af, sock_t = None, socket.AF_UNIX, socket.SOCK_STREAM
if not server: server, pid_path = map(c.pa.runtime_path, ['cli', 'pid'])
else:
if not is_list(server):
server = c.force_str(server)
if not server.startswith('/'): server = server, 4712 # default port
if is_list(server):
try:
addrinfo = socket.getaddrinfo(
server[0], server[1], 0, sock_t, socket.IPPROTO_TCP )
if not addrinfo: raise socket.gaierror('No addrinfo for socket: {}'.format(server))
except (socket.gaierror, socket.error) as err:
raise PulseError( 'Failed to resolve socket parameters'
' (address, family) via getaddrinfo: {!r} - {} {}'.format(server, type(err), err) )
sock_af, sock_t, _, _, server = addrinfo[0]
s = socket.socket(sock_af, sock_t)
s.settimeout(socket_timeout)
while True:
ts = c.mono_time()
try: s.connect(server)
except socket.error as err:
if err.errno not in [errno.ECONNREFUSED, errno.ENOENT, errno.ECONNABORTED]: raise
else: break
if n:
n -= 1
if n <= 0: raise PulseError('Number of connection attempts ({}) exceeded'.format(attempts))
if pid_path:
with open(pid_path) as src: os.kill(int(src.read().strip()), signal.SIGUSR2)
def _pulse_loop(self):
with self._loop_lock:
if not self._loop: return
if self._loop_running:
raise PulseError(
'Running blocking pulse operations from pulse eventloop callbacks'
' or other threads while loop is running is not supported by this python module.'
' Supporting this would require threads or proper asyncio/twisted-like async code.'
' Workaround can be to stop the loop'
' (raise PulseLoopStop in callback or event_loop_stop() from another thread),'
' doing whatever pulse calls synchronously and then resuming event_listen() loop.' )
self._loop_running, self._loop_stop = True, False
try: yield self._loop
finally:
self._loop_running = False
if self._loop_closed: self.close() # to free() after stopping it