How to use the streamlink.plugin.api.validate.all function in streamlink

To help you get started, we’ve selected a few streamlink examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github streamlink / streamlink / src / streamlink / plugins / common_jwplayer.py View on Github external
from functools import partial

from streamlink.plugin.api import validate
from streamlink.plugin.api.utils import parse_json

__all__ = ["parse_playlist"]

_playlist_re = re.compile(r"\(?\{.*playlist: (\[.*\]),.*?\}\)?;", re.DOTALL)
_js_to_json = partial(re.compile(r"(\w+):\s").sub, r'"\1":')

_playlist_schema = validate.Schema(
    validate.transform(_playlist_re.search),
    validate.any(
        None,
        validate.all(
            validate.get(1),
            validate.transform(_js_to_json),
            validate.transform(parse_json),
            [{
                "sources": [{
                    "file": validate.text,
                    validate.optional("label"): validate.text
                }]
            }]
        )
    )
)


def parse_playlist(res):
    """Attempts to parse a JWPlayer playlist in a HTTP response body."""
github streamlink / streamlink / src / streamlink / plugins / cdnbg.py View on Github external
inlife\.bg|
            mmtvmusic\.com/live|
            mu-vi\.tv/LiveStreams/pages/Live\.aspx|
            videochanel\.bstv\.bg|
            live\.bstv\.bg|
            bloombergtv.bg/video
        )/?
    """, re.VERBOSE)
    iframe_re = re.compile(r"iframe .*?src=\"((?:https?(?::|:))?//(?:\w+\.)?cdn.bg/live[^\"]+)\"", re.DOTALL)
    sdata_re = re.compile(r"sdata\.src.*?=.*?(?P<q>[\"'])(?Phttp.*?)(?P=q)")
    hls_file_re = re.compile(r"(src|file): (?P<q>[\"'])(?P(https?:)?//.+?m3u8.*?)(?P=q)")
    hls_src_re = re.compile(r"video src=(?Phttp[^ ]+m3u8[^ ]*)")

    stream_schema = validate.Schema(
        validate.any(
            validate.all(validate.transform(sdata_re.search), validate.get("url")),
            validate.all(validate.transform(hls_file_re.search), validate.get("url")),
            validate.all(validate.transform(hls_src_re.search), validate.get("url")),
        )
    )

    @classmethod
    def can_handle_url(cls, url):
        return cls.url_re.match(url) is not None

    def find_iframe(self, res):
        p = urlparse(self.url)
        for url in self.iframe_re.findall(res.text):
            if "googletagmanager" not in url:
                url = url.replace(":", ":")
                if url.startswith("//"):
                    return "{0}:{1}".format(p.scheme, url)</q></q>
github streamlink / streamlink / src / streamlink / plugins / deutschewelle.py View on Github external
validate.xml_find(".//meta"),
                validate.xml_element(attrib={"base": validate.text}),
                validate.get("base")
            ),
            "streams": validate.all(
                validate.xml_findall(".//switch/*"),
                [
                    validate.all(
                        validate.getattr("attrib"),
                        {
                            "src": validate.text,
                            "system-bitrate": validate.all(
                                validate.text,
                                validate.transform(int),
                            ),
                            validate.optional("width"): validate.all(
                                validate.text,
                                validate.transform(int)
                            )
                        }
                    )
                ]
            )
        })
    )

    @classmethod
    def can_handle_url(cls, url):
        return cls.url_re.match(url) is not None

    def _create_stream(self, url, quality=None):
        if url.startswith('rtmp://'):
github streamlink / streamlink / src / streamlink / plugins / daisuki.py View on Github external
_language_schema = validate.Schema(
    validate.xml_findtext("./country_code")
)

_xml_to_srt_schema = validate.Schema(
    validate.xml_findall(".//body/div"),
    [
        validate.union([validate.all(
            validate.getattr("attrib"),
            validate.get("{http://www.w3.org/XML/1998/namespace}lang")
        ),
            validate.all(
                validate.xml_findall("./p"),
                validate.transform(lambda x: list(enumerate(x, 1))),
                [
                    validate.all(
                        validate.union({
                            "i": validate.get(0),
                            "begin": validate.all(
                                validate.get(1),
                                validate.getattr("attrib"),
                                validate.get("begin"),
                                validate.transform(lambda s: s.replace(".", ","))
                            ),
                            "end": validate.all(
                                validate.get(1),
                                validate.getattr("attrib"),
                                validate.get("end"),
                                validate.transform(lambda s: s.replace(".", ","))
                            ),
                            "text": validate.all(
                                validate.get(1),
github streamlink / streamlink / src / streamlink / plugins / livestation.py View on Github external
LOGIN_PAGE_URL = "http://www.livestation.com/en/users/new"
LOGIN_POST_URL = "http://www.livestation.com/en/sessions.json"

_csrf_token_re = re.compile(r"")
_url_re = re.compile(r"http(s)?://(\w+\.)?livestation.com")

_csrf_token_schema = validate.Schema(
    validate.transform(_csrf_token_re.search),
    validate.any(None, validate.get(1))
)
_hls_playlist_schema = validate.Schema(
    validate.transform(_hls_playlist_re.search),
    validate.any(
        None,
        validate.all(
            validate.get(1),
            validate.url(scheme="http", path=validate.endswith(".m3u8"))
        )
    )
)
_login_schema = validate.Schema({
    "email": validate.text,
    validate.optional("errors"): validate.all(
        {
            "base": [validate.text]
        },
        validate.get("base"),
    )
})
github streamlink / streamlink / src / streamlink / plugins / rtbf.py View on Github external
_video_player_re = re.compile(r'.+?)".*?', re.DOTALL)
    _video_stream_data_re = re.compile(r'
github streamlink / streamlink / src / streamlink / plugins / viasat.py View on Github external
from streamlink.plugin import Plugin
from streamlink.plugin.api import StreamMapper, validate
from streamlink.stream import HDSStream, HLSStream, RTMPStream
from streamlink.utils import rtmpparse

STREAM_API_URL = "https://playapi.mtgx.tv/v3/videos/stream/{0}"

_swf_url_re = re.compile(r"data-flashplayer-url=\"([^\"]+)\"")
_player_data_re = re.compile(r"window.fluxData\s*=\s*JSON.parse\(\"(.+)\"\);")

_stream_schema = validate.Schema(
    validate.any(
        None,
        validate.all({"msg": validate.text}),
        validate.all({
            "streams": validate.all(
                {validate.text: validate.any(validate.text, int, None)},
                validate.filter(lambda k, v: isinstance(v, validate.text))
            )
        }, validate.get("streams"))
    )
)


class Viasat(Plugin):
    """Streamlink Plugin for Viasat"""

    _iframe_re = re.compile(r"""[^"']+)["'].+allowfullscreen""")
    _image_re = re.compile(r"""\d+)/[^/]+\.jpg""")

    _url_re = re.compile(r"""https?://(?:www\.)?
        (?:
github streamlink / streamlink / src / streamlink / plugins / svtplay.py View on Github external
{
        "videoReferences": validate.all(
            [{
                "url": validate.text,
                "format": validate.text
            }],
        ),
    },
    validate.get("videoReferences")
)

# Old video schema
_old_video_schema = validate.Schema(
    {
        "video": {
            "videoReferences": validate.all(
                [{
                    "url": validate.text,
                    "playerType": validate.text
                }],
            ),
        }
    },
    validate.get("video"),
    validate.get("videoReferences")
)


class SVTPlay(Plugin):
    @classmethod
    def can_handle_url(self, url):
        return _url_re.match(url)
github streamlink / streamlink / src / streamlink / plugins / furstream.py View on Github external
import re

from streamlink.plugin import Plugin
from streamlink.plugin.api import http, validate
from streamlink.stream import RTMPStream

_url_re = re.compile(r"^http(s)?://(\w+\.)?furstre\.am/stream/.+")
_stream_url_re = re.compile(r"
github streamlink / streamlink / src / streamlink / plugins / atresplayer.py View on Github external
from streamlink.plugin import Plugin
from streamlink.plugin.api import validate, StreamMapper
from streamlink.stream import HLSStream, DASHStream
from streamlink.utils import parse_json, update_scheme, search_dict

log = logging.getLogger(__name__)


class AtresPlayer(Plugin):
    url_re = re.compile(r"https?://(?:www.)?atresplayer.com/")
    state_re = re.compile(r"""window.__PRELOADED_STATE__\s*=\s*({.*?});""", re.DOTALL)
    channel_id_schema = validate.Schema(
        validate.transform(state_re.search),
        validate.any(
            None,
            validate.all(
                validate.get(1),
                validate.transform(parse_json),
                validate.transform(partial(search_dict, key="urlVideo"))
            )
        )
    )
    stream_schema = validate.Schema(
        validate.transform(parse_json),
        {"sources": [
            validate.all({
                "src": validate.url(),
                validate.optional("type"): validate.text
            })
        ]}, validate.get("sources"))