Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import print_function
import re
import string
from base64 import b64decode
from pprint import pprint
from streamlink import PluginError
from streamlink.plugin import Plugin
from streamlink.plugin.api import useragents
from streamlink.plugin.api import validate
from streamlink.stream import HLSStream
from streamlink.compat import urlparse
from streamlink.utils import parse_json
class ovvaTV(Plugin):
url_re = re.compile(r"https?://(?:www\.)?ovva.tv/(?:ua/)?tvguide/.*?/online")
iframe_re = re.compile(r"iframe .*?src=\"((?:https?:)?//(?:\w+\.)?ovva.tv/[^\"]+)\"", re.DOTALL)
data_re = re.compile(r"ovva\(\'(.*?)\'\);")
ovva_data_schema = validate.Schema({
"url": validate.url()
}, validate.get("url"))
ovva_redirect_schema = validate.Schema(validate.all(
validate.transform(lambda x: x.split("=")),
['302', validate.url()],
validate.get(1)
))
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None
from __future__ import print_function
import re
from streamlink.compat import urlparse, parse_qsl, urlunparse
from streamlink.plugin import Plugin
from streamlink.stream import HLSStream
class Swisstxt(Plugin):
url_re = re.compile(r"""https?://(?:
live\.(rsi)\.ch/|
(?:www\.)?(srf)\.ch/sport/resultcenter
)""", re.VERBOSE)
api_url = "http://event.api.swisstxt.ch/v1/stream/{site}/byEventItemIdAndType/{id}/HLS"
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None and cls.get_event_id(url)
@classmethod
def get_event_id(cls, url):
return dict(parse_qsl(urlparse(url).query.lower())).get("eventid")
def get_stream_url(self, event_id):
url_m = self.url_re.match(self.url)
@Plugin.broken()
def _get_streams(self):
match = self.url_re.match(self.url)
res = self.session.http.post(self.flashvars_url,
data=dict(
broadcaster=match.group("broadcaster") or "Verm",
stream_id=match.group("channel_id") or match.group("highlight_id")))
flashvars = self.session.http.json(res, schema=self.flashvars_schema)
if flashvars.get("pereakaurl"):
url = self.pereakaurl.format(flashvars.get("pereakaurl").strip("/"))
return HLSStream.parse_variant_playlist(self.session, url)
elif flashvars.get("akaurl"):
url = self.akaurl.format(flashvars.get("akaurl").strip("/"))
import re
from uuid import uuid4
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Util.Padding import pad, unpad
from streamlink import PluginError
from streamlink.compat import urljoin, urlparse
from streamlink.plugin import Plugin, PluginArguments, PluginArgument
from streamlink.stream import HLSStream
log = logging.getLogger(__name__)
class USTVNow(Plugin):
_url_re = re.compile(r"https?://(?:www\.)?ustvnow\.com/live/(?P\w+)/-(?P\d+)")
_main_js_re = re.compile(r"""src=['"](main\..*\.js)['"]""")
_enc_key_re = re.compile(r'(?PAES_(?:Key|IV))\s*:\s*"(?P[^"]+)"')
TENANT_CODE = "ustvnow"
_api_url = "https://teleupapi.revlet.net/service/api/v1/"
_token_url = _api_url + "get/token"
_signin_url = "https://www.ustvnow.com/signin"
arguments = PluginArguments(
PluginArgument(
"username",
metavar="USERNAME",
required=True,
help="Your USTV Now account username"
),
def decrypt(self, data):
return self.unpad(self.cipher.decrypt(base64.b64decode(data, altchars=b"-_")))
def request(self, data, *args, **kwargs):
res = self.session.http.get(self.base_url + self.encrypt(data), *args, **kwargs)
return self.decrypt(res.content)
def get_cdn_list(self, vid, manager="apedemak", vtype="video", lang="es", schema=None):
data = self.request("{id}_{manager}_{type}_{lang}".format(id=vid, manager=manager, type=vtype, lang=lang))
if schema:
return schema.validate(data)
else:
return data
class Rtve(Plugin):
secret_key = base64.b64decode("eWVMJmRhRDM=")
url_re = re.compile(r"""
https?://(?:www\.)?rtve\.es/(?:directo|noticias|television|deportes|alacarta|drmn)/.*?/?
""", re.VERBOSE)
cdn_schema = validate.Schema(
validate.transform(partial(parse_xml, invalid_char_entities=True)),
validate.xml_findall(".//preset"),
[
validate.union({
"quality": validate.all(validate.getattr("attrib"),
validate.get("type")),
"urls": validate.all(
validate.xml_findall(".//url"),
[validate.getattr("text")]
)
})
import re
from streamlink import PluginError
from streamlink.plugin import Plugin, PluginArguments, PluginArgument
from streamlink.plugin.api import validate
from streamlink.stream import HLSStream
from streamlink.stream import RTMPStream
from streamlink.compat import urljoin
class LiveEdu(Plugin):
login_url = "https://www.liveedu.tv/accounts/login/"
url_re = re.compile(r"https?://(?:\w+\.)?(?:livecoding|liveedu)\.tv/")
config_re = re.compile(r"""\Wconfig.(?P\w+)\s*=\s*(?P<q>['"])(?P.*?)(?P=q);""")
csrf_re = re.compile(r'''"csrfToken"\s*:\s*"(\w+)"''')
api_schema = validate.Schema({
"viewing_urls": {
validate.optional("error"): validate.text,
validate.optional("urls"): [{
"src": validate.url(),
"type": validate.text,
validate.optional("res"): int,
validate.optional("label"): validate.text,
}]
}
})
config_schema = validate.Schema({</q>
schema=schema
)
# Unsupported/Removed private API calls
def channel_viewer_info(self, channel, **params):
warnings.warn("The channel_viewer_info API call is unsupported and may stop working at any time")
return self.call("/api/channels/{0}/viewer".format(channel), private=True, **params)
def channel_subscription(self, channel, **params):
warnings.warn("The channel_subscription API call has been removed and no longer works",
category=DeprecationWarning)
return self.call("/api/channels/{0}/subscription".format(channel), private=True, **params)
class Twitch(Plugin):
arguments = PluginArguments(
PluginArgument("oauth-token",
sensitive=True,
metavar="TOKEN",
help="""
An OAuth token to use for Twitch authentication.
Use --twitch-oauth-authenticate to create a token.
"""),
PluginArgument("cookie",
sensitive=True,
metavar="COOKIES",
help="""
Twitch cookies to authenticate to allow access to subscription channels.
Example:
from __future__ import print_function
import re
from streamlink.plugin import Plugin
from streamlink.plugin.api import validate
from streamlink.stream import HLSStream, RTMPStream
from streamlink.utils import parse_json, update_scheme
class EarthCam(Plugin):
url_re = re.compile(r"https?://(?:www.)?earthcam.com/.*")
playpath_re = re.compile(r"(?P/.*/)(?P.*?\.flv)")
swf_url = "http://static.earthcam.com/swf/streaming/stream_viewer_v3.swf"
json_base_re = re.compile(r"""var[ ]+json_base[^=]+=.*?(\{.*?});""", re.DOTALL)
cam_name_re = re.compile(r"""var[ ]+currentName[^=]+=[ \t]+(?P["'])(?P\w+)(?P=quote);""", re.DOTALL)
cam_data_schema = validate.Schema(
validate.transform(json_base_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(lambda d: d.replace("\\/", "/")),
validate.transform(parse_json),
)
)
)
import re
from streamlink.plugin import Plugin
from streamlink.plugin.api import useragents, validate
from streamlink.stream import HLSStream
from streamlink.utils import parse_json, update_scheme
class IDF1(Plugin):
DACAST_API_URL = 'https://json.dacast.com/b/{}/{}/{}'
DACAST_TOKEN_URL = 'https://services.dacast.com/token/i/b/{}/{}/{}'
_url_re = re.compile(r'https?://www\.idf1\.fr/(videos/[^/]+/[^/]+\.html|live\b)')
_video_id_re = re.compile(r"dacast\('(?P\d+)_(?P[a-z]+)_(?P\d+)', 'replay_content', data\);")
_video_id_alt_re = re.compile(r'
import re
from streamlink.plugin import Plugin
from streamlink.stream import HLSStream
_RE_URL = re.compile(r'^https?://streamboat.tv/.+')
_RE_CDN = re.compile(r'"cdn_host"\s*:\s*"([^"]+)"')
_RE_PLAYLIST = re.compile(r'"playlist_url"\s*:\s*"([^"]+)"')
class StreamBoat(Plugin):
@classmethod
def can_handle_url(cls, url):
return bool(_RE_URL.match(url))
def _get_streams(self):
res = self.session.http.get(self.url)
text = res.text
cdn = _RE_CDN.search(text).group(1)
playlist_url = _RE_PLAYLIST.search(text).group(1)
url = 'http://{}{}'.format(cdn, playlist_url)
return dict(source=HLSStream(self.session, url))
__plugin__ = StreamBoat