Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(?:
(atvavrupa)\.tv
|
(atv|a2tv|ahaber|aspor|minikago|minikacocuk|anews)\.com\.tr
)/webtv/(?:live-broadcast|canli-yayin)
|
sabah\.com\.tr/(apara)/canli-yayin
)""")
_hls_url = "https://trkvz-live.ercdn.net/{channel}/{channel}.m3u8"
_token_url = "https://securevideotoken.tmgrup.com.tr/webtv/secure"
_token_schema = validate.Schema(validate.all(
{
"Success": True,
"Url": validate.url(),
},
validate.get("Url"))
)
@classmethod
def can_handle_url(cls, url):
return cls._url_re.match(url) is not None
def _get_streams(self):
url_m = self._url_re.match(self.url)
domain = url_m.group(1) or url_m.group(2) or url_m.group(3)
# remap the domain to channel
channel = {"atv": "atvhd",
"ahaber": "ahaberhd",
"apara": "aparahd",
"aspor": "asporhd",
"anews": "anewshd",
"minikacocuk": "minikagococuk"}.get(domain, domain)
_url_re = re.compile(r"""
http(s)?://(www\.)?zhanqi.tv
/(?P[^/]+)
""", re.VERBOSE)
_room_schema = validate.Schema(
{
"data": validate.any(None, {
"status": validate.all(
validate.text,
validate.transform(int)
),
"videoId": validate.text
})
},
validate.get("data")
)
class Zhanqitv(Plugin):
@classmethod
def can_handle_url(self, url):
return _url_re.match(url)
def _get_streams(self):
match = _url_re.match(self.url)
channel = match.group("channel")
res = self.session.http.get(API_URL.format(channel))
room = self.session.http.json(res, schema=_room_schema)
if not room:
self.logger.info("Not a valid room url.")
_url_re = re.compile(
r"http(s)?://(\w+\.)?streamingvideoprovider.co.uk/(?P[^/&?]+)"
)
_hls_re = re.compile(r"'(http://.+\.m3u8)'")
_rtmp_schema = validate.Schema(
validate.xml_findtext("./info/url"),
validate.url(scheme="rtmp")
)
_hls_schema = validate.Schema(
validate.transform(_hls_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.url(
scheme="http",
path=validate.endswith("m3u8")
)
)
)
)
class Streamingvideoprovider(Plugin):
@classmethod
def can_handle_url(self, url):
return _url_re.match(url)
def _get_hls_stream(self, channel_name):
params = {
_url_re = re.compile(r"""
http(s)?://(\w+\.)?gaminglive\.tv
/(?Pchannels|videos)/(?P[^/]+)
""", re.VERBOSE)
_quality_re = re.compile(r"[^/]+-(?P[^/]+)")
_channel_schema = validate.Schema(
{
validate.optional("state"): {
"stream": {
"qualities": [validate.text],
"rootUrl": validate.url(scheme="rtmp")
}
}
},
validate.get("state")
)
_vod_schema = validate.Schema(
{
"name": validate.text,
"channel_slug": validate.text,
"title": validate.text,
"created_at": validate.transform(int)
},
)
class GamingLive(Plugin):
@classmethod
def can_handle_url(self, url):
return _url_re.match(url)
validate.transform(dict),
{
"s": validate.text,
"country": validate.text,
"init": validate.text,
validate.optional("ss_id"): validate.text,
validate.optional("mv_id"): validate.text,
validate.optional("device_cd"): validate.text,
validate.optional("ss1_prm"): validate.text,
validate.optional("ss2_prm"): validate.text,
validate.optional("ss3_prm"): validate.text
}
),
"clientlibs": validate.all(
validate.transform(_clientlibs_re.search),
validate.get(2),
validate.text
)
})
)
_language_schema = validate.Schema(
validate.xml_findtext("./country_code")
)
_xml_to_srt_schema = validate.Schema(
validate.xml_findall(".//body/div"),
[
validate.union([validate.all(
validate.getattr("attrib"),
validate.get("{http://www.w3.org/XML/1998/namespace}lang")
),
playlist_position_re = re.compile(r"playlistPosition\s*=\s*(\d+);")
video_collection_schema = validate.Schema(
validate.union({
"position": validate.all(
validate.transform(playlist_position_re.search),
validate.any(
None,
validate.all(validate.get(1), validate.transform(int))
)
),
"playlist": validate.all(
validate.transform(video_collection_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(parse_json)
)
)
)
})
)
arguments = PluginArguments(
PluginArgument(
"email",
requires=["password"],
metavar="EMAIL",
help="The email address used to register with animelab.com."
),
PluginArgument(
"password",
sensitive=True,
validate.get(2),
validate.text
)
})
)
_language_schema = validate.Schema(
validate.xml_findtext("./country_code")
)
_xml_to_srt_schema = validate.Schema(
validate.xml_findall(".//body/div"),
[
validate.union([validate.all(
validate.getattr("attrib"),
validate.get("{http://www.w3.org/XML/1998/namespace}lang")
),
validate.all(
validate.xml_findall("./p"),
validate.transform(lambda x: list(enumerate(x, 1))),
[
validate.all(
validate.union({
"i": validate.get(0),
"begin": validate.all(
validate.get(1),
validate.getattr("attrib"),
validate.get("begin"),
validate.transform(lambda s: s.replace(".", ","))
),
"end": validate.all(
validate.get(1),
{
"Playlists":
{
"Playlist": [{
"@protocol": validate.text,
"url": [{"@quality": validate.text, "text": validate.url()}]
}]
}
}
},
validate.get("Video"),
validate.get("Playlists"),
validate.get("Playlist"))
token_schema = validate.Schema({"token": {"authparams": validate.text}},
validate.get("token"),
validate.get("authparams"))
@classmethod
def can_handle_url(cls, url):
return cls.url_re.match(url) is not None
def get_video_id(self):
parsed = urlparse(self.url)
qinfo = dict(parse_qsl(parsed.query or parsed.fragment.lstrip("?")))
site, video_id = None, None
url_m = self.url_re.match(self.url)
# look for the video id in the URL, otherwise find it in the page
if "tvLiveId" in qinfo:
video_id = qinfo["tvLiveId"]
site = url_m.group(1)
log = logging.getLogger(__name__)
class Schoolism(Plugin):
url_re = re.compile(r"https?://(?:www\.)?schoolism\.com/(viewAssignment|watchLesson).php")
login_url = "https://www.schoolism.com/index.php"
key_time_url = "https://www.schoolism.com/video-html/key-time.php"
playlist_re = re.compile(r"var allVideos\s*=\s*(\[.*\]);", re.DOTALL)
js_to_json = partial(re.compile(r'(?!<")(\w+):(?!/)').sub, r'"\1":')
fix_brackets = partial(re.compile(r',\s*\}').sub, r'}')
playlist_schema = validate.Schema(
validate.transform(playlist_re.search),
validate.any(
None,
validate.all(
validate.get(1),
validate.transform(js_to_json),
validate.transform(fix_brackets), # remove invalid ,
validate.transform(parse_json),
[{
"sources": validate.all([{
validate.optional("playlistTitle"): validate.text,
"title": validate.text,
"src": validate.text,
"type": validate.text,
}],
# only include HLS streams
# validate.filter(lambda s: s["type"] == "application/x-mpegurl")
)
}]
)
)
""", re.VERBOSE)
mediator_re = re.compile(
r'window\.__IPLAYER_REDUX_STATE__\s*=\s*({.*?});', re.DOTALL)
state_re = re.compile(r'window.__IPLAYER_REDUX_STATE__\s*=\s*({.*});')
account_locals_re = re.compile(r'window.bbcAccount.locals\s*=\s*({.*?});')
hash = base64.b64decode(b"N2RmZjc2NzFkMGM2OTdmZWRiMWQ5MDVkOWExMjE3MTk5MzhiOTJiZg==")
api_url = "https://open.live.bbc.co.uk/mediaselector/6/select/version/2.0/mediaset/{platform}/vpid/{vpid}/format/json/atk/{vpid_hash}/asn/1/"
platforms = ("pc", "iptv-all")
session_url = "https://session.bbc.com/session"
auth_url = "https://account.bbc.com/signin"
mediator_schema = validate.Schema(
{
"versions": [{"id": validate.text}]
},
validate.get("versions"), validate.get(0),
validate.get("id")
)
mediaselector_schema = validate.Schema(
validate.transform(parse_json),
{"media": [
{"connection":
validate.all([{
validate.optional("href"): validate.url(),
validate.optional("transferFormat"): validate.text
}], validate.filter(lambda c: c.get("href"))),
"kind": validate.text}
]},
validate.get("media"),
validate.filter(lambda x: x["kind"] == "video")
)
arguments = PluginArguments(