Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_gap():
obj = m3u8.Segment(
uri='fileSequence271.ts',
duration=4,
gap_tag=True
)
result = str(obj)
expected = '#EXTINF:4,\n#EXT-X-GAP\nfileSequence271.ts'
assert result == expected
break
else:
pkt_size = frame[2]
byterange = str(pkt_size) + '@' + frame[1]
if i < len(iframes) - 1:
extinf = float(iframes[i+1][0]) - float(frame[0])
else:
last_frame_time = ts_data[-1]['best_effort_timestamp_time']
extinf = float(last_frame_time) - float(frame[0])
segment_bytes += int(frame[2])
segment_duration += extinf
iframe_segments.append(m3u8.Segment(segment.uri,
segment.base_uri,
duration=extinf,
byterange=byterange))
return iframe_segments, segment_bytes, segment_duration
value_proc=int,
default=0
)
return await self.download(m3u8_obj.playlists[index].absolute_uri)
else:
tmp_list = m3u8.M3U8()
tmp_list.version = '3'
tmp_list.media_sequence = '0'
tmp_list.target_duration = m3u8_obj.target_duration
tmp_list.is_endlist = True
tasks = []
os.makedirs(self.cache_dir, exist_ok=True)
bar = ShadyBar(self.name, max=len(m3u8_obj.segments), suffix='%(percent).1f%% - %(eta_td)s')
for i, segment in enumerate(m3u8_obj.segments):
tmp_list.add_segment(
m3u8.Segment(
f'{os.path.realpath(self.cache_dir)}/{i}.ts',
duration=segment.duration,
base_uri='file://'
)
)
tasks.append(
asyncio.ensure_future(
self.download_segment(i, segment, bar)
)
)
tmp_list.dump(f'{self.cache_dir}/filelist.m3u8')
await asyncio.gather(*tasks)
else:
click.echo('Live streaming media is not suppported!')