Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_qq_streams(self, vid):
res = self.session.http.get(QQ_STREAM_INFO_URL % (vid, 1))
info = self.session.http.json(res, schema=_qq_schema)
yield "live", HTTPStream(self.session, info)
res = self.session.http.get(QQ_STREAM_INFO_URL % (vid, 2))
info = self.session.http.json(res, schema=_qq_schema)
yield "live", HLSStream(self.session, info)
from streamlink.compat import urlparse
from streamlink.exceptions import PluginError
from streamlink.plugin import Plugin
from streamlink.stream import (AkamaiHDStream, HDSStream, HLSStream,
HTTPStream, RTMPStream)
import ast
import re
PROTOCOL_MAP = {
"akamaihd": AkamaiHDStream,
"hds": HDSStream.parse_manifest,
"hls": HLSStream,
"hlsvariant": HLSStream.parse_variant_playlist,
"httpstream": HTTPStream,
"rtmp": RTMPStream,
"rtmpe": RTMPStream,
"rtmps": RTMPStream,
"rtmpt": RTMPStream,
"rtmpte": RTMPStream
}
PARAMS_REGEX = r"(\w+)=({.+?}|\[.+?\]|\(.+?\)|'(?:[^'\\]|\\')*'|\"(?:[^\"\\]|\\\")*\"|\S+)"
SCHEME_REGEX = re.compile(r"^\w+://(.+)")
class StreamURL(Plugin):
@classmethod
def can_handle_url(cls, url):
parsed = urlparse(url)
return parsed.scheme in PROTOCOL_MAP
# Extract audio streams from the DASH format list
for stream_info in info.get("adaptive_fmts", []):
if stream_info.get("s"):
protected = True
continue
stream_params = dict(parse_qsl(stream_info["url"]))
if "itag" not in stream_params:
continue
itag = int(stream_params["itag"])
# extract any high quality streams only available in adaptive formats
adaptive_streams[itag] = stream_info["url"]
stream_type, stream_format = stream_info["type"]
if stream_type == "audio":
stream = HTTPStream(self.session, stream_info["url"])
name = "audio_{0}".format(stream_format)
streams[name] = stream
# find the best quality audio stream m4a, opus or vorbis
if best_audio_itag is None or self.adp_audio[itag] > self.adp_audio[best_audio_itag]:
best_audio_itag = itag
if best_audio_itag and adaptive_streams and MuxedStream.is_usable(self.session):
aurl = adaptive_streams[best_audio_itag]
for itag, name in self.adp_video.items():
if itag in adaptive_streams:
vurl = adaptive_streams[itag]
log.debug("MuxedStream: v {video} a {audio} = {name}".format(
audio=best_audio_itag,
name=name,
video=itag,
def _create_stream(self, url, quality=None):
if url.startswith('rtmp://'):
return (quality, RTMPStream(self.session, {'rtmp': url}))
if url.endswith('.m3u8'):
return HLSStream.parse_variant_playlist(self.session, url).items()
return (quality, HTTPStream(self.session, url))
def _get_streams(self):
quality = ['source', 'medium', 'low']
for i in range(0, 3, 1):
if 'rtmp:' in rtmp_url:
stream = RTMPStream(self.session, {
"rtmp": rtmp_url,
"live": True
})
yield quality[i], stream
else:
yield quality[i], HTTPStream(self.session, rtmp_url)
p = urlparse(url)
if p.path.endswith("dash.mpd"):
# LIVE
url = self.session.http.get(url).json()["url"]
elif p.path.endswith("master.json"):
# VOD
url = url.replace("master.json", "master.mpd")
else:
log.error("Unsupported DASH path: {0}".format(p.path))
continue
for stream in DASHStream.parse_manifest(self.session, url).items():
streams.append(stream)
for stream in videos.get("progressive", []):
streams.append((stream["quality"], HTTPStream(self.session, stream["url"])))
if self.get_option("mux_subtitles") and data["request"].get("text_tracks"):
substreams = {
s["lang"]: HTTPStream(self.session, "https://vimeo.com" + s["url"])
for s in data["request"]["text_tracks"]
}
for quality, stream in streams:
yield quality, MuxedStream(self.session, stream, subtitles=substreams)
else:
for stream in streams:
yield stream
params = streamtype.get("params") or {}
try:
streams.update(parser(self.session, playlist, **params))
except IOError as err:
self.logger.error("Failed to extract {0} streams: {1}",
f.upper(), err)
# HTTP: Also make direct content URLs available for use.
http_formats = info["formats"].get("http")
if http_formats and "mp4" in http_formats:
for stream in http_formats["mp4"]:
p = stream["paths"][0]
url = "{0}/{1}".format(self._build_url(**p), p["filename"])
stream_name = "http_{0}k".format(stream["bitrate"])
streams[stream_name] = HTTPStream(self.session, url)
return streams
def _parse_streams(self, res):
for match in self._src_re.finditer(res.text):
stream_url = match.group("url")
if "\\/" in stream_url:
# if the URL is json encoded, decode it
stream_url = parse_json("\"{}\"".format(stream_url))
if ".mpd" in stream_url:
for s in DASHStream.parse_manifest(self.session, stream_url).items():
yield s
elif ".mp4" in stream_url:
yield match.group(1), HTTPStream(self.session, stream_url)
else:
log.debug("Non-dash/mp4 stream: {0}".format(stream_url))
match = self._dash_manifest_re.search(res.text)
if match:
# facebook replaces "<" characters with the substring "\\x3C"
manifest = match.group("manifest").replace("\\/", "/")
if is_py3:
manifest = bytes(unquote_plus(manifest), "utf-8").decode("unicode_escape")
else:
manifest = unquote_plus(manifest).decode("string_escape")
# Ignore unsupported manifests until DASH SegmentBase support is implemented
if "SegmentBase" in manifest:
log.error("Skipped DASH manifest with SegmentBase streams")
else:
for s in DASHStream.parse_manifest(self.session, manifest).items():