Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from functools import partial
from streamlink.plugin.api import validate
from streamlink.plugin.api.utils import parse_json
__all__ = ["parse_playlist"]
_playlist_re = re.compile(r"\(?\{.*playlist: (\[.*\]),.*?\}\)?;", re.DOTALL)
_js_to_json = partial(re.compile(r"(\w+):\s").sub, r'"\1":')
_playlist_schema = validate.Schema(
validate.transform(_playlist_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(_js_to_json),
validate.transform(parse_json),
[{
"sources": [{
"file": validate.text,
validate.optional("label"): validate.text
}]
}]
)
)
)
def parse_playlist(res):
"""Attempts to parse a JWPlayer playlist in a HTTP response body."""
inlife\.bg|
mmtvmusic\.com/live|
mu-vi\.tv/LiveStreams/pages/Live\.aspx|
videochanel\.bstv\.bg|
live\.bstv\.bg|
bloombergtv.bg/video
)/?
""", re.VERBOSE)
iframe_re = re.compile(r"iframe .*?src=\"((?:https?(?::|:))?//(?:\w+\.)?cdn.bg/live[^\"]+)\"", re.DOTALL)
sdata_re = re.compile(r"sdata\.src.*?=.*?(?P<q>[\"'])(?Phttp.*?)(?P=q)")
hls_file_re = re.compile(r"(src|file): (?P<q>[\"'])(?P(https?:)?//.+?m3u8.*?)(?P=q)")
hls_src_re = re.compile(r"video src=(?Phttp[^ ]+m3u8[^ ]*)")
stream_schema = validate.Schema(
validate.any(
validate.all(validate.transform(sdata_re.search), validate.get("url")),
validate.all(validate.transform(hls_file_re.search), validate.get("url")),
validate.all(validate.transform(hls_src_re.search), validate.get("url")),
)
)
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None
def find_iframe(self, res):
p = urlparse(self.url)
for url in self.iframe_re.findall(res.text):
if "googletagmanager" not in url:
url = url.replace(":", ":")
if url.startswith("//"):
return "{0}:{1}".format(p.scheme, url)</q></q>
validate.xml_find(".//meta"),
validate.xml_element(attrib={"base": validate.text}),
validate.get("base")
),
"streams": validate.all(
validate.xml_findall(".//switch/*"),
[
validate.all(
validate.getattr("attrib"),
{
"src": validate.text,
"system-bitrate": validate.all(
validate.text,
validate.transform(int),
),
validate.optional("width"): validate.all(
validate.text,
validate.transform(int)
)
}
)
]
)
})
)
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None
def _create_stream(self, url, quality=None):
if url.startswith('rtmp://'):
_language_schema = validate.Schema(
validate.xml_findtext("./country_code")
)
_xml_to_srt_schema = validate.Schema(
validate.xml_findall(".//body/div"),
[
validate.union([validate.all(
validate.getattr("attrib"),
validate.get("{http://www.w3.org/XML/1998/namespace}lang")
),
validate.all(
validate.xml_findall("./p"),
validate.transform(lambda x: list(enumerate(x, 1))),
[
validate.all(
validate.union({
"i": validate.get(0),
"begin": validate.all(
validate.get(1),
validate.getattr("attrib"),
validate.get("begin"),
validate.transform(lambda s: s.replace(".", ","))
),
"end": validate.all(
validate.get(1),
validate.getattr("attrib"),
validate.get("end"),
validate.transform(lambda s: s.replace(".", ","))
),
"text": validate.all(
validate.get(1),
LOGIN_PAGE_URL = "http://www.livestation.com/en/users/new"
LOGIN_POST_URL = "http://www.livestation.com/en/sessions.json"
_csrf_token_re = re.compile(r"")
_url_re = re.compile(r"http(s)?://(\w+\.)?livestation.com")
_csrf_token_schema = validate.Schema(
validate.transform(_csrf_token_re.search),
validate.any(None, validate.get(1))
)
_hls_playlist_schema = validate.Schema(
validate.transform(_hls_playlist_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.url(scheme="http", path=validate.endswith(".m3u8"))
)
)
)
_login_schema = validate.Schema({
"email": validate.text,
validate.optional("errors"): validate.all(
{
"base": [validate.text]
},
validate.get("base"),
)
})
_video_player_re = re.compile(r'.+?)".*?', re.DOTALL)
_video_stream_data_re = re.compile(r'
from streamlink.plugin import Plugin
from streamlink.plugin.api import StreamMapper, validate
from streamlink.stream import HDSStream, HLSStream, RTMPStream
from streamlink.utils import rtmpparse
STREAM_API_URL = "https://playapi.mtgx.tv/v3/videos/stream/{0}"
_swf_url_re = re.compile(r"data-flashplayer-url=\"([^\"]+)\"")
_player_data_re = re.compile(r"window.fluxData\s*=\s*JSON.parse\(\"(.+)\"\);")
_stream_schema = validate.Schema(
validate.any(
None,
validate.all({"msg": validate.text}),
validate.all({
"streams": validate.all(
{validate.text: validate.any(validate.text, int, None)},
validate.filter(lambda k, v: isinstance(v, validate.text))
)
}, validate.get("streams"))
)
)
class Viasat(Plugin):
"""Streamlink Plugin for Viasat"""
_iframe_re = re.compile(r"""[^"']+)["'].+allowfullscreen""")
_image_re = re.compile(r"""\d+)/[^/]+\.jpg""")
_url_re = re.compile(r"""https?://(?:www\.)?
(?:
{
"videoReferences": validate.all(
[{
"url": validate.text,
"format": validate.text
}],
),
},
validate.get("videoReferences")
)
# Old video schema
_old_video_schema = validate.Schema(
{
"video": {
"videoReferences": validate.all(
[{
"url": validate.text,
"playerType": validate.text
}],
),
}
},
validate.get("video"),
validate.get("videoReferences")
)
class SVTPlay(Plugin):
@classmethod
def can_handle_url(self, url):
return _url_re.match(url)
import re
from streamlink.plugin import Plugin
from streamlink.plugin.api import http, validate
from streamlink.stream import RTMPStream
_url_re = re.compile(r"^http(s)?://(\w+\.)?furstre\.am/stream/.+")
_stream_url_re = re.compile(r"
from streamlink.plugin import Plugin
from streamlink.plugin.api import validate, StreamMapper
from streamlink.stream import HLSStream, DASHStream
from streamlink.utils import parse_json, update_scheme, search_dict
log = logging.getLogger(__name__)
class AtresPlayer(Plugin):
url_re = re.compile(r"https?://(?:www.)?atresplayer.com/")
state_re = re.compile(r"""window.__PRELOADED_STATE__\s*=\s*({.*?});""", re.DOTALL)
channel_id_schema = validate.Schema(
validate.transform(state_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(parse_json),
validate.transform(partial(search_dict, key="urlVideo"))
)
)
)
stream_schema = validate.Schema(
validate.transform(parse_json),
{"sources": [
validate.all({
"src": validate.url(),
validate.optional("type"): validate.text
})
]}, validate.get("sources"))