Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
quality = max([q['width'], q['height']])
stream_type = self.quality2_2_id[quality]
stream_profile = self.id_2_profile[stream_type]
info.stream_types.append(stream_type)
info.streams[stream_type] = {
'container': 'm3u8',
'video_profile': stream_profile,
'src': [q[url]],
'size': 0
}
else:
add_header('Referer', 'https://www.acfun.cn/')
if m3u8Info is None:
data = json.loads(get_content('https://www.acfun.cn/rest/pc-direct/play/playInfo/m3u8Auto?videoId={}'.format(sourceVid)))
m3u8Info = data['playInfo']['streams'][0]
# some videos are broken of CDN, random choose one
m3u8api = random.choice(m3u8Info['playUrls'])
self.logger.warning('Request m3u8 info via CDN: %s\nIf video has broken on this CDN, please retry.', m3u8api)
lines = get_content(m3u8api)
self.logger.debug('m3u8 api: %s', lines)
lines = lines.split('\n')
resolutions = None
for line in lines:
if resolutions is None:
resolutions = match1(line, 'RESOLUTION=(\d+x\d+)')
if resolutions:
resolutions = [int(q) for q in resolutions.split('x')]
elif match1(line, '(\.m3u8)'):
def prepare(self):
add_default_handler(HTTPCookieProcessor)
install_default_handlers()
add_header('Referer', self.url)
add_header('User-Agent', ua)
info = VideoInfo(self.name)
if self.url and not self.vid:
self.vid = match1(self.url, 'https?://www.mgtv.com/b/\d+/(\d+).html')
if self.vid is None:
html = get_content(self.url)
self.vid = match1(html, 'vid=(\d+)', 'vid=\"(\d+)', 'vid: (\d+)')
did = str(uuid.uuid4())
tk2 = generate_tk2(did)
api_info_url = 'https://pcweb.api.mgtv.com/player/video?tk2={}&video_id={}&type=pch5'.format(tk2, self.vid)
meta = json.loads(get_content(api_info_url))
assert meta['code'] == 200, '[failed] code: {}, msg: {}'.format(meta['code'], meta['msg'])
assert meta['data'], '[Failed] Video info not found.'
pm2 = meta['data']['atc']['pm2']
info.title = meta['data']['info']['title']
api_source_url = 'https://pcweb.api.mgtv.com/player/getSource?pm2={}&tk2={}&video_id={}&type=pch5'.format(pm2, tk2, self.vid)
meta = json.loads(get_content(api_source_url))
def gettmts(tvid, vid):
tm = int(time.time() * 1000)
key = 'd5fb4bd9d50c4be6948c97edd7254b0e'
host = 'https://cache.m.iqiyi.com'
params = {
'src': '76f90cbd92f94a2e925d83e8ccd22cb7',
'sc': md5(str(tm) + key + vid),
't': tm
}
src = '/tmts/{}/{}/?{}'.format(tvid, vid, urlencode(params))
req_url = '{}{}'.format(host, src)
html = get_content(req_url)
return json.loads(html)
ID = match1(self.url, '/(\d+)')
api1_data = json.loads(get_content(api1_url.format(ID)))
if api1_data['code'] == 0:
self.vid = api1_data['data']['room_id']
else:
self.logger.debug('Get room ID from API failed: %s', api1_data['msg'])
self.vid = ID
api2_data = json.loads(get_content(api2_url.format(self.vid)))
assert api2_data['code'] == 0, api2_data['msg']
api2_data = api2_data['data']
assert api2_data['live_status'] == 1, u'主播正在觅食......'
info.title = title = api2_data['title']
api3_data = json.loads(get_content(api3_url.format(self.vid)))
if api3_data['code'] == 0:
info.artist = artist = api3_data['data']['info']['uname']
info.title = '{} - {}'.format(title, artist)
def get_live_info(q=0):
params = {
'player': 1,
'cid': self.vid,
'platform': 'html5',
'quality': q,
'otype': 'json'
}
data = json.loads(get_content(api_url + urlencode(params)))
assert data['code'] == 0, data['message']
def get_video_info(qn=0):
# need login with "qn=112"
if int(qn) > 80:
return
api_url = self.get_api_url(qn)
html = get_content(api_url)
self.logger.debug('HTML> ' + html)
code = match1(html, '<code>([^<])')
if code:
return
urls, size, fmt, qlt, aqlts = parse_cid_playurl(html)
if 'mp4' in fmt:
ext = 'mp4'
elif 'flv' in fmt:
ext = 'flv'
st, prf = self.format_2_type_profile[fmt]
if urls and st not in info.streams:
info.stream_types.append(st)
info.streams[st] = {'container': ext, 'video_profile': prf, 'src' : urls, 'size': size}
if qn == 0:</code>
'k_err_retries': 0,
'up': '',
'qd_v': 2,
'tm': tm,
'qdy': 'a',
'qds': 0,
'ut': 0, # 600 bid isn't available
# relative to encode
#'k_ft1': ,
#'k_ft4': ,
#'k_ft5': ,
}
src = '/dash?{}'.format(urlencode(params))
vf = cmd5x(src)
req_url = '{}{}&vf={}'.format(host, src, vf)
html = get_content(req_url)
return json.loads(html)
def prepare(self):
info = VideoInfo(self.name)
self.vid = match1(self.url, 'play/(\d+)')
html = get_content(self.url)
if not self.vid:
self.vid = match1(html, 'data-vid="(\d+)')
title = match1(html, '<h1 class="video-title">(.+?)</h1>')
info.artist = artist = match1(html, "<div class="video-author">[\s\S]+?<h3>(.+?)</h3>")
info.title = u'{} - {}'.format(title, artist)
t1 = int(time.time() * 1000)
t2 = t1 + random.randrange(5, 10)
rnd = str(random.random()).replace('.', '')
params = {
'callback': 'jQuery1124{}_{}'.format(rnd, t1),
'r': 'vhuyaplay/video',
'vid': self.vid,
'format': 'mp4,m3u8',
'_': t2
}</div>
def prepare_list(self):
html = get_content(self.url)
eplist = matchall(html, ['"epList":(\[.*?\])'])
if eplist:
eplist = sum(map(matchall, eplist, [['"(?:ep_)?id":(\d+),']] * len(eplist)), [])
return ['https://www.bilibili.com/bangumi/play/ep{}'.format(eid) for eid in eplist]
host = 'http://cache.video.qiyi.com'
params = {
'tvid': tvid,
'vid': vid,
'v': 0,
'qypid': '{}_12'.format(tvid),
'src': '01012001010000000000',
't': tm,
'k_tag': 1,
'k_uid': get_macid(),
'rs': 1,
}
src = '/vps?{}'.format(urlencode(params))
vf = md5x(src)
req_url = '{}{}&vf={}'.format(host, src, vf)
html = get_content(req_url)
return json.loads(html)
try:
for title, fmt_name, stream_profile, type_name, urls, size, rate in self.get_streams_info():
stream_id = self.stream_2_id[fmt_name]
if urls and stream_id not in info.stream_types:
info.stream_types.append(stream_id)
info.streams[stream_id] = {
'container': type_name,
'video_profile': stream_profile,
'src' : urls,
'size': size
}
video_rate[stream_id] = rate
break
except AssertionError as e:
if 'wrong vid' in str(e):
html = get_content(self.url)
self.vid = match1(html,
'&vid=(\w+)',
'vid:\s*[\"\'](\w+)',
'vid\s*=\s*[\"\']\s*(\w+)',
'"vid":"(\w+)"')
continue
raise e
if self.vip:
self.logger.warning('This is a VIP video!')
#self.limit = False
assert len(info.stream_types), "can't play this video!!"
info.stream_types = sorted(info.stream_types, key = self.stream_ids.index)
info.title = title