Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_json(self):
self.assertEqual({}, parse_json("{}"))
self.assertEqual({"test": 1}, parse_json("""{"test": 1}"""))
self.assertEqual({"test": 1}, parse_json("""{"test": 1}""", schema=validate.Schema({"test": 1})))
self.assertRaises(PluginError, parse_json, """{"test: 1}""")
self.assertRaises(IOError, parse_json, """{"test: 1}""", exception=IOError)
self.assertRaises(PluginError, parse_json, """{"test: 1}""" * 10)
[{
validate.optional("s"): validate.text,
"type": validate.all(
validate.text,
validate.transform(lambda t: t.split(";")[0].split("/")),
[validate.text, validate.text]
),
"url": validate.all(
validate.url(scheme="http")
)
}]
),
validate.optional("hlsvp"): validate.text,
validate.optional("player_response"): validate.all(
validate.text,
validate.transform(parse_json),
{
validate.optional("streamingData"): {
validate.optional("hlsManifestUrl"): validate.text,
},
validate.optional("videoDetails"): {
validate.optional("isLive"): validate.transform(bool),
},
validate.optional("playabilityStatus"): {
validate.optional("status"): validate.text,
validate.optional("reason"): validate.all(validate.text,
validate.transform(maybe_decode)),
},
},
),
validate.optional("live_playback"): validate.transform(bool),
validate.optional("reason"): validate.all(validate.text, validate.transform(maybe_decode)),
import re
from streamlink.plugin import Plugin
from streamlink.plugin.api import useragents, validate
from streamlink.stream import HLSStream
from streamlink.utils import parse_json
class Vidio(Plugin):
_url_re = re.compile(r"https?://(?:www\.)?vidio\.com/(?:en/)?(?Plive|watch)/(?P\d+)-(?P[^/?#&]+)")
_playlist_re = re.compile(r'''hls-url=["'](?P[^"']+)["']''')
_data_id_re = re.compile(r'''meta\s+data-id=["'](?P[^"']+)["']''')
csrf_tokens_url = "https://www.vidio.com/csrf_tokens"
tokens_url = "https://www.vidio.com/live/{id}/tokens"
token_schema = validate.Schema(validate.transform(parse_json),
{"token": validate.text},
validate.get("token"))
@classmethod
def can_handle_url(cls, url):
return cls._url_re.match(url)
def get_csrf_tokens(self):
return self.session.http.get(self.csrf_tokens_url,
schema=self.token_schema)
def get_url_tokens(self, stream_id):
self.logger.debug("Getting stream tokens")
csrf_token = self.get_csrf_tokens()
return self.session.http.post(self.tokens_url.format(id=stream_id),
files={"authenticity_token": (None, csrf_token)},
video_collection_schema = validate.Schema(
validate.union({
"position": validate.all(
validate.transform(playlist_position_re.search),
validate.any(
None,
validate.all(validate.get(1), validate.transform(int))
)
),
"playlist": validate.all(
validate.transform(video_collection_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(parse_json)
)
)
)
})
)
arguments = PluginArguments(
PluginArgument(
"email",
requires=["password"],
metavar="EMAIL",
help="The email address used to register with animelab.com."
),
PluginArgument(
"password",
sensitive=True,
metavar="PASSWORD",
from streamlink.utils import parse_json
log = logging.getLogger(__name__)
class Vimeo(Plugin):
_url_re = re.compile(r"https?://(player\.vimeo\.com/video/\d+|(www\.)?vimeo\.com/.+)")
_config_url_re = re.compile(r'(?:"config_url"|\bdata-config-url)\s*[:=]\s*(".+?")')
_config_re = re.compile(r"var\s+config\s*=\s*({.+?})\s*;")
_config_url_schema = validate.Schema(
validate.transform(_config_url_re.search),
validate.any(
None,
validate.Schema(
validate.get(1),
validate.transform(parse_json),
validate.transform(html_unescape),
validate.url(),
),
),
)
_config_schema = validate.Schema(
validate.transform(parse_json),
{
"request": {
"files": {
validate.optional("dash"): {"cdns": {validate.text: {"url": validate.url()}}},
validate.optional("hls"): {"cdns": {validate.text: {"url": validate.url()}}},
validate.optional("progressive"): validate.all(
[{"url": validate.url(), "quality": validate.text}]
),
},
_geo_schema = validate.Schema(
{
'country': validate.text,
'zone': validate.text
}
)
_video_stream_schema = validate.Schema(
validate.transform(_video_stream_data_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(html_unescape),
validate.transform(parse_json),
{
'geoLocRestriction': validate.text,
validate.optional('isLive'): bool,
validate.optional('startDate'): validate.text,
validate.optional('endDate'): validate.text,
'sources': validate.any(
[],
validate.Schema({
validate.text: validate.any(None, '', validate.url())
})
),
validate.optional('urlHls'): validate.any(None, '', validate.url()),
validate.optional('urlDash'): validate.any(None, '', validate.url()),
validate.optional('streamUrlHls'): validate.any(None, '', validate.url()),
validate.optional('streamUrlDash'): validate.any(None, '', validate.url()),
validate.optional('drm'): bool,
def _get_page_config(self):
pres = self._get_page()
match = self._config_re.search(pres.text)
if match:
self._pconfig = parse_json(match.group(1))
return self._pconfig
class Schoolism(Plugin):
url_re = re.compile(r"https?://(?:www\.)?schoolism\.com/(viewAssignment|watchLesson).php")
login_url = "https://www.schoolism.com/index.php"
key_time_url = "https://www.schoolism.com/video-html/key-time.php"
playlist_re = re.compile(r"var allVideos\s*=\s*(\[.*\]);", re.DOTALL)
js_to_json = partial(re.compile(r'(?!<")(\w+):(?!/)').sub, r'"\1":')
fix_brackets = partial(re.compile(r',\s*\}').sub, r'}')
playlist_schema = validate.Schema(
validate.transform(playlist_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(js_to_json),
validate.transform(fix_brackets), # remove invalid ,
validate.transform(parse_json),
[{
"sources": validate.all([{
validate.optional("playlistTitle"): validate.text,
"title": validate.text,
"src": validate.text,
"type": validate.text,
}],
# only include HLS streams
# validate.filter(lambda s: s["type"] == "application/x-mpegurl")
)
}]
)
)
)
arguments = PluginArguments(
{
validate.optional('html5'): validate.all(
[
{
'src': validate.url()
},
],
),
'hls': validate.url(),
'hds': validate.url()
},
validate.transform(lambda x: [update_scheme(IDF1.DACAST_API_URL, x['hls']), x['hds']] + [y['src'] for y in x.get('html5', [])])
)
_token_schema = validate.Schema(
validate.transform(parse_json),
{'token': validate.text},
validate.get('token')
)
_user_agent = useragents.IE_11
@classmethod
def can_handle_url(cls, url):
return IDF1._url_re.match(url)
def _get_streams(self):
res = self.session.http.get(self.url)
match = self._video_id_re.search(res.text) or self._video_id_alt_re.search(res.text)
if match is None:
return
broadcaster_id = match.group('broadcaster_id')
def _get_streams(self):
res = http.get(self.url)
match = _info_re.search(res.text)
if not match:
return
info = parse_json(match.group(1), schema=_schema)
stream_name = info["mode"]
mp4_url = info.get("mp4_url")
ios_url = info.get("ios_url")
swf_url = info.get("swf_url")
if mp4_url:
stream = HTTPStream(self.session, mp4_url)
yield stream_name, stream
if ios_url:
if urlparse(ios_url).path.endswith(".m3u8"):
streams = HLSStream.parse_variant_playlist(self.session, ios_url)
# TODO: Replace with "yield from" when dropping Python 2.
for stream in streams.items():
yield stream