Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
video_url,
content_type,
current_time=kwargs.get("current_time"),
title=kwargs.get("title"),
thumb=kwargs.get("thumb"),
subtitles=kwargs.get("subtitles"),
)
self._controller.block_until_active()
def restore(self, data):
self.play_media_url(
data["content_id"], current_time=data["current_time"], title=data["title"], thumb=data["thumb"]
)
class DashCastController(CastController):
def __init__(self, cast, app, prep=None):
self._controller = PyChromecastDashCastController()
super(DashCastController, self).__init__(cast, app, prep=prep)
def load_url(self, url, **kwargs):
self._controller.load_url(url, force=True)
def prep_app(self):
"""Make sure desired chromecast app is running."""
# We must force the launch of the DashCast app because it, by design,
# becomes unresponsive after a website is loaded.
self._cast.start_app(self._cast_listener.app_id, force_launch=True)
self._cast_listener.app_ready.wait()
self._controller = PyChromecastDashCastController()
super(DashCastController, self).__init__(cast, app, prep=prep)
def load_url(self, url, **kwargs):
self._controller.load_url(url, force=True)
def prep_app(self):
"""Make sure desired chromecast app is running."""
# We must force the launch of the DashCast app because it, by design,
# becomes unresponsive after a website is loaded.
self._cast.start_app(self._cast_listener.app_id, force_launch=True)
self._cast_listener.app_ready.wait()
class YoutubeCastController(CastController, MediaControllerMixin, PlaybackBaseMixin):
def __init__(self, cast, app, prep=None):
self._controller = YouTubeController()
super(YoutubeCastController, self).__init__(cast, app, prep=prep)
self.info_type = "id"
self.save_capability = "partial"
self.playlist_capability = "complete"
def play_media_id(self, video_id):
self._controller.play_video(video_id)
def play_playlist(self, playlist_id, video_id):
self.clear()
self._controller.play_video(video_id, playlist_id)
def add(self, video_id):
# You can't add videos to the queue while the app is buffering.
def wait_for(self, states: list, invert: bool = False, fail: bool = False, timeout: Optional[int] = None) -> bool:
media_listener = MediaStatusListener(
self._cast.media_controller.status.player_state, states, invert=invert, fail=fail
)
self._cast.media_controller.register_status_listener(media_listener)
try:
return media_listener.wait_for_states(timeout=timeout)
except pychromecast.error.UnsupportedNamespace:
raise CastError("Chromecast app operation was interrupted")
def restore(self, data):
raise NotImplementedError
class DefaultCastController(CastController, MediaControllerMixin, PlaybackBaseMixin):
def __init__(self, cast, app, prep=None):
super(DefaultCastController, self).__init__(cast, app, prep=prep)
self.info_type = "url"
self.save_capability = "complete" if (self._is_seekable and self._cast.app_id == DEFAULT_APP.id) else None
def play_media_url(self, video_url, **kwargs):
content_type = kwargs.get("content_type") or "video/mp4"
self._controller.play_media(
video_url,
content_type,
current_time=kwargs.get("current_time"),
title=kwargs.get("title"),
thumb=kwargs.get("thumb"),
subtitles=kwargs.get("subtitles"),
)
self._controller.block_until_active()