Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _ctx_init(self):
if self._ctx:
with self._loop_lock:
self.disconnect()
c.pa.context_unref(self._ctx)
self._ctx = c.pa.context_new(self._api, self.name)
c.pa.context_set_state_callback(self._ctx, self._pa_state_cb, None)
c.pa.context_set_subscribe_callback(self._ctx, self._pa_subscribe_cb, None)
def close(self):
if not self._loop: return
if self._loop_running: # called from another thread
self._loop_closed = True
c.pa.mainloop_quit(self._loop, 0)
return # presumably will be closed in a thread that's running it
with self._loop_lock:
try:
self.disconnect()
c.pa.context_unref(self._ctx)
c.pa.mainloop_free(self._loop)
finally: self._ctx = self._loop = None