Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_endswith_newline():
obj = m3u8.loads(playlists.SIMPLE_PLAYLIST)
manifest = obj.dumps()
assert manifest.endswith('#EXT-X-ENDLIST\n')
def get_playlist(self, filename):
data = open(get_path(__file__, os.path.join('data', filename))).read()
return m3u8.loads(data)
# Get the M3u8 file - this is where rate limiting has been happening
video_host, playlist = self.__get_playlist(token)
if playlist.is_variant:
print('[+] Multiple resolutions found. Slurping all resolutions.')
for plist in playlist.playlists:
resolution = str(plist.stream_info.resolution[0]) + 'x' + str(plist.stream_info.resolution[1])
resolution_file = Path(self.storage) / Path(resolution + '.mp4')
print('[+] Downloading ' + resolution)
playlist_url = video_host + plist.uri
ts_m3u8_response = self.requests.get(playlist_url, headers = {'Authorization': None})
ts_m3u8_parse = m3u8.loads(ts_m3u8_response.text)
ts_list = []
ts_full_file_list = []
for ts_uri in ts_m3u8_parse.segments.uri:
# ts_list.append(video_host + ts_uri)
ts_file = requests.get(video_host + ts_uri)
fname = ts_uri.split('/')[-1]
ts_path = Path(self.storage) / Path(fname)
ts_list.append(ts_path)
ts_path.write_bytes(ts_file.content)
ts_full_file = Path(self.storage) / Path(resolution + '.ts')
ts_full_file = str(ts_full_file)
def _download_video(video_id, args):
if args.start and args.end and args.end <= args.start:
raise ConsoleError("End time must be greater than start time")
print_out("Looking up video...")
video = twitch.get_video(video_id)
print_out("Found: {} by {}".format(
video['title'], video['channel']['display_name']))
print_out("Fetching access token...")
access_token = twitch.get_access_token(video_id)
print_out("Fetching playlists...")
playlists = twitch.get_playlists(video_id, access_token)
parsed = m3u8.loads(playlists)
selected = _select_quality(parsed.playlists)
print_out("\nFetching playlist...")
response = requests.get(selected.uri)
response.raise_for_status()
playlist = m3u8.loads(response.text)
base_uri = re.sub("/[^/]+$", "/", selected.uri)
target_dir = _crete_temp_dir(base_uri)
filenames = list(_get_files(playlist, args.start, args.end))
# Save playlists for debugging purposes
with open(target_dir + "playlists.m3u8", "w") as f:
f.write(playlists)
with open(target_dir + "playlist.m3u8", "w") as f:
f.write(response.text)
def get_live_stream(self):
import m3u8
token, sig = self.get_token_and_signature()
r = random.randint(0, 1E7)
url = USHER_API.format(channel=self.streamer, sig=sig, token=token, random=r)
r = requests.get(url)
m3u8_obj = m3u8.loads(r.text)
return m3u8_obj, url
self.yuu_logger.debug('Title: {}'.format(output_name))
self.m3u8_url_list = hls_list
self.yuu_logger.debug('Requesting m3u8 list')
r = self.session.get(hls_list)
r2 = self.session.get(hls_list2)
self.yuu_logger.debug('m3u8 requested')
if r.status_code == 403:
return None, 'This video is geo-locked for Japan only.'
self.yuu_logger.debug('Parsing m3u8')
r_all = m3u8.loads(r.text)
r2_all = m3u8.loads(r2.text)
band_list_v4 = []
for v4d in r_all.playlists:
s_info = v4d.stream_info
audio_inf = s_info.audio.strip('audio')
if _resolution[-2:] == audio_inf:
band_list_v4.append((s_info.bandwidth, str(s_info.resolution[1]) + audio_inf))
for v3d in r2_all.playlists:
bw = v3d.stream_info.bandwidth
for v4d in band_list_v4:
bwv4, resv4 = v4d
if bw == bwv4:
self.m3u8_url = v3d.uri
self.resolution = resv4
self.est_filesize = round(bw / 1024 / 5, 2)
def get_stream_playlist(self, master_url):
fn_name = 'get_stream_playlist'
playlists = {}
try:
r = self.__session.get(master_url)
if r.status_code != 200:
raise self.NetworkError(fn_name, self.NETWORK_ERR_NON_200, r.status_code)
playlist_obj = m3u8.loads(r.text)
if playlist_obj.is_variant:
for playlist in playlist_obj.playlists:
bitrate = str(int(playlist.stream_info.bandwidth) / 1000)
playlists[bitrate] = master_url[:master_url.rfind('/') + 1] + playlist.uri + '|' + urllib.urlencode(self.__playlist_headers)
else:
playlists['0'] = master_url + '|' + urllib.urlencode(self.__playlist_headers)
except requests.exceptions.ConnectionError as error:
raise self.NetworkError(fn_name, error)
return playlists
def load_m3u8(src, url=None, headers=None):
# is playlist raw text, a path to a file, or a url?
if src.upper().startswith('#EXTM3U'):
m3u8_obj = m3u8.loads(src)
m3u8_obj.playlist_url = url
m3u8_obj.base_uri = _parsed_url(url)
elif os.path.exists(src):
m3u8_obj = m3u8.load(src)
m3u8_obj.playlist_url = url
m3u8_obj.base_uri = _parsed_url(url)
else:
m3u8_obj = m3u8.load(src, headers=headers or DEFAULT_HEADERS)
m3u8_obj.playlist_url = src
return m3u8_obj