Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dict_optional_keys(self):
assert validate({"a": 1, optional("b"): 2}, {"a": 1}) == {"a": 1}
assert validate({"a": 1, optional("b"): 2},
{"a": 1, "b": 2}) == {"a": 1, "b": 2}
import re
from streamlink.compat import urlparse
from streamlink.plugin import Plugin
from streamlink.plugin.api import http, validate
from streamlink.stream import RTMPStream, HTTPStream, HLSStream
from streamlink.utils import parse_json, rtmpparse, swfdecompress
_url_re = re.compile(r"http(s)?://api.dmcloud.net/player/embed/[^/]+/[^/]+")
_rtmp_re = re.compile(br'customURL[^h]+(https://.*?)\\')
_info_re = re.compile(r"var info = (.*);")
_schema = validate.Schema(
{
"mode": validate.text,
validate.optional("mp4_url"): validate.url(scheme="http"),
validate.optional("ios_url"): validate.url(scheme="http"),
validate.optional("swf_url"): validate.url(scheme="http"),
}
)
class DMCloud(Plugin):
@classmethod
def can_handle_url(self, url):
return _url_re.match(url)
def _get_rtmp_stream(self, swfurl):
res = http.get(swfurl)
swf = swfdecompress(res.content)
match = _rtmp_re.search(swf)
if not match:
)
_player_schema = validate.Schema(
{
"clip": {
"baseUrl": validate.any(None, validate.text),
"bitrates": validate.all(
validate.filter(lambda b: b.get("url") and b.get("label")),
[{
"label": validate.text,
"url": validate.text,
}],
)
},
validate.optional("playlist"): [{
validate.optional("connectionProvider"): validate.text,
validate.optional("netConnectionUrl"): validate.text,
validate.optional("bitrates"): [{
"label": validate.text,
"url": validate.text,
"provider": validate.text
}]
}],
"plugins": validate.all(
dict,
validate.filter(lambda k, v: k in ["rtmp", "rtmpHitbox", "hls"]),
{
validate.text: {
validate.optional("netConnectionUrl"): validate.text,
"url": validate.text
}
}
)
),
"url": validate.all(
validate.url(scheme="http")
)
}]
),
validate.optional("hlsvp"): validate.text,
validate.optional("player_response"): validate.all(
validate.text,
validate.transform(parse_json),
{
validate.optional("streamingData"): {
validate.optional("hlsManifestUrl"): validate.text,
},
validate.optional("videoDetails"): {
validate.optional("isLive"): validate.transform(bool),
},
validate.optional("playabilityStatus"): {
validate.optional("status"): validate.text,
validate.optional("reason"): validate.all(validate.text,
validate.transform(maybe_decode)),
},
},
),
validate.optional("live_playback"): validate.transform(bool),
validate.optional("reason"): validate.all(validate.text, validate.transform(maybe_decode)),
validate.optional("livestream"): validate.text,
validate.optional("live_playback"): validate.text,
validate.optional("author"): validate.all(validate.text,
validate.transform(maybe_decode)),
validate.optional("title"): validate.all(validate.text,
validate.transform(maybe_decode)),
"is_live": bool,
"qualities": [{
"bitrate": int,
"height": int
}],
validate.optional("play_url"): validate.url(scheme="http"),
validate.optional("m3u8_url"): validate.url(
scheme="http",
path=validate.endswith(".m3u8")
),
}, None)
},
validate.optional("playerUri"): validate.text,
validate.optional("viewerPlusSwfUrl"): validate.url(scheme="http"),
validate.optional("lsPlayerSwfUrl"): validate.text,
validate.optional("hdPlayerSwfUrl"): validate.text
})
_smil_schema = validate.Schema(validate.union({
"http_base": validate.all(
validate.xml_find("{http://www.w3.org/2001/SMIL20/Language}head/"
"{http://www.w3.org/2001/SMIL20/Language}meta"
"[@name='httpBase']"),
validate.xml_element(attrib={
"content": validate.text
}),
validate.get("content")
),
"videos": validate.all(
validate.xml_findall("{http://www.w3.org/2001/SMIL20/Language}body/"
"{http://www.w3.org/2001/SMIL20/Language}switch/"
"{http://www.w3.org/2001/SMIL20/Language}video"),
[
cnnturk.com/canli-yayin|
dreamtv.com.tr/canli-yayin|
dreamturk.com.tr/canli)
""", re.VERBOSE)
playerctrl_re = re.compile(r''']*?ng-controller=(?P["'])(?:Live)?PlayerCtrl(?P=quote).*?>''', re.DOTALL)
data_id_re = re.compile(r'''data-id=(?P["'])(?P\w+)(?P=quote)''')
content_id_re = re.compile(r'"content(?:I|i)d", "(\w+)"')
item_id_re = re.compile(r"_itemId\s+=\s+'(\w+)';")
content_api = "/actions/content/media/{id}"
new_content_api = "/action/media/{id}"
content_api_schema = validate.Schema({
"Id": validate.text,
"Media": {
"Link": {
"DefaultServiceUrl": validate.url(),
validate.optional("ServiceUrl"): validate.any(validate.url(), ""),
"SecurePath": validate.text,
}
}
})
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None
def _get_content_id(self):
res = self.session.http.get(self.url)
# find the contentId
content_id_m = self.content_id_re.search(res.text)
if content_id_m:
self.logger.debug("Found contentId by contentId regex")
return content_id_m.group(1)
},
validate.optional("videoDetails"): {
validate.optional("isLive"): validate.transform(bool),
},
validate.optional("playabilityStatus"): {
validate.optional("status"): validate.text,
validate.optional("reason"): validate.all(validate.text,
validate.transform(maybe_decode)),
},
},
),
validate.optional("live_playback"): validate.transform(bool),
validate.optional("reason"): validate.all(validate.text, validate.transform(maybe_decode)),
validate.optional("livestream"): validate.text,
validate.optional("live_playback"): validate.text,
validate.optional("author"): validate.all(validate.text,
validate.transform(maybe_decode)),
validate.optional("title"): validate.all(validate.text,
validate.transform(maybe_decode)),
"status": validate.text
}
)
_ytdata_re = re.compile(r'window\["ytInitialData"\]\s*=\s*({.*?});', re.DOTALL)
_url_re = re.compile(r"""(?x)https?://(?:\w+\.)?youtube\.com
(?:
(?:
/(?:
watch.+v=
|
embed/(?!live_stream)
|
"original": 1080,
"hd": 720,
"sd": 480
}
_url_re = re.compile(r"https?://play\.afreecatv\.com/(?P\w+)(?:/\d+)?")
_channel_schema = validate.Schema(
{
"CHANNEL": {
"RESULT": validate.transform(int),
validate.optional("BPWD"): validate.text,
validate.optional("BNO"): validate.text,
validate.optional("RMD"): validate.text,
validate.optional("AID"): validate.text,
validate.optional("CDN"): validate.text
}
},
validate.get("CHANNEL")
)
_stream_schema = validate.Schema(
{
validate.optional("view_url"): validate.url(
scheme=validate.any("rtmp", "http")
),
"stream_status": validate.text
}
)
class AfreecaTV(Plugin):
from streamlink.utils import parse_json, update_scheme
class IDF1(Plugin):
DACAST_API_URL = 'https://json.dacast.com/b/{}/{}/{}'
DACAST_TOKEN_URL = 'https://services.dacast.com/token/i/b/{}/{}/{}'
_url_re = re.compile(r'https?://www\.idf1\.fr/(videos/[^/]+/[^/]+\.html|live\b)')
_video_id_re = re.compile(r"dacast\('(?P\d+)_(?P[a-z]+)_(?P\d+)', 'replay_content', data\);")
_video_id_alt_re = re.compile(r'
)
''', re.VERBOSE)
_live_player_re = re.compile(r'{APP_BUNDLE:"(?P.+?/app.js)"')
_js_to_json_re = partial(re.compile(r'(\w+):(["\']|\d?\.?\d+,|true|false|\[|{)').sub, r'"\1":\2')
_video_id_re = re.compile(r'data-bmmr-id=\\"(?P.+?)\\"')
_mp4_bitrate_re = re.compile(r'.*_(?P[0-9]+)\.mp4')
_preload_state_re = re.compile(r'window.__PRELOADED_STATE__\s*=\s*({.*});', re.DOTALL)
_live_stream_info_re = re.compile(r'12:.*t.exports=({.*})},{}],13:', re.DOTALL)
_live_streams_schema = validate.Schema(
validate.transform(_js_to_json_re),
validate.transform(lambda x: x.replace(':.', ':0.')),
validate.transform(parse_json),
validate.Schema({
validate.text: {
validate.optional('cdns'): validate.all(
[
validate.Schema(
{
'streams': validate.all([
validate.Schema(
{'url': validate.transform(lambda x: re.sub(r'(https?:/)([^/])', r'\1/\2', x))},
validate.get('url'),
validate.url()
)
]),
},
validate.get('streams')
)
],
validate.transform(lambda x: [i for y in x for i in y])
)