Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.launch(app_launched_callback)
def join(self, timeout=None):
"""Join the thread."""
self._socket_client.join(timeout=timeout)
def disconnect(self, timeout=None, blocking=True):
"""Disconnect the controller."""
self._socket_client.disconnect()
if blocking:
self.join(timeout=timeout)
# pylint: disable=too-many-public-methods
class PlexApiController(PlexController):
"""A controller that can use plexapi.."""
def __init__(self, pms):
super(PlexApiController, self).__init__()
self.pms = pms
def _get_current_media(self):
"""Get current media_item, media and part for pms."""
key = int(self.status.content_id.split("/")[-1])
media_item = self.pms.fetchItem(key).reload()
media_idx = self.status.media_custom_data.get("mediaIndex", 0)
part_idx = self.status.media_custom_data.get("partIndex", 0)
media = media_item.media[media_idx]
part = media.parts[part_idx]
return media_item, media, part
NEXT
MUTE
UNMUTE
VOLUME - also requires an int representing level from 0-100
"""
Log.Debug('Recieved a call to control playback')
params = sort_headers(['Uri', 'Cmd', 'Val'], False)
status = "Missing paramaters"
response = "Error"
if params is not False:
uri = params['Uri'].split(":")
cast = pychromecast.Chromecast(uri[0], int(uri[1]))
cast.wait()
pc = PlexController(cast)
Log.Debug("Handler namespace is %s" % pc.namespace)
cast.register_handler(pc)
Log.Debug("Handler namespace is %s" % pc.namespace)
cmd = params['Cmd']
Log.Debug("Command is " + cmd)
if cmd == "play":
pc.play()
if cmd == "pause":
pc.pause()
if cmd == "stop":
pc.stop()
if cmd == "next":
pc.next()
port = int(client_uri[1])
pc = False
msg = "No message received"
if 'Serverid' in values:
servers = fetch_servers()
for server in servers:
if server['id'] == values['Serverid']:
Log.Debug("Found a matching server!")
values['Serveruri'] = server['uri']
values['Version'] = server['version']
try:
cast = pychromecast.Chromecast(host, port)
cast.wait()
values['Type'] = cast.cast_type
pc = PlexController(cast)
cast.register_handler(pc)
Log.Debug("Sending values to play command: " + JSON.StringFromObject(values))
pc.play_media(values, log_data)
except pychromecast.LaunchError, pychromecast.PyChromecastError:
Log.Debug('Error connecting to host.')
status = "Error"
finally:
if pc is not False:
status = "Success"
oc = FlexContainer(attributes={
'Name': 'Playback Status',
'Status': status,
'Message': msg
})
uris = device['uri'].split(":")
host = uris[0]
port = uris[1]
cast = False
try:
cast = pychromecast.Chromecast(host, int(port), timeout=3, tries=1)
except Exception:
Log.Error("Unable to connecct to device.")
if cast:
Log.Debug("Waiting for devices.")
cast.wait(2)
app_id = cast.app_id
meta_dict = False
if app_id == "9AC194DC":
pc = PlexController(cast)
cast.register_handler(pc)
plex_status = pc.plex_status()
raw_status = {
'state': plex_status['state'],
'volume': plex_status['volume'],
'muted': plex_status['muted']
}
meta_dict = plex_status['meta']
if 'title' in meta_dict:
delements = []
i = 0
for session in session_statuses:
if (meta_dict['title'] == session['Video']['title']) & (host == session['address']):
delements.append(i)
meta_dict = session['Video']
del session['Video']
def __init__(self):
super(PlexController, self).__init__("urn:x-cast:plex", "9AC194DC")
self.app_id = "9AC194DC"
self.namespace = "urn:x-cast:plex"
self.request_id = 0
self.play_media_event = threading.Event()
self._last_play_msg = {}