Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_play_live(self):
self.requests_mock.get.side_effect = [
make_response(open(get_path(__file__, "data/play_live/playlist_1.m3u"), mode='rb').read()),
Mock(content=sentinel.data_1),
Mock(content=sentinel.data_2),
Mock(content=sentinel.data_3),
make_response(open(get_path(__file__, "data/play_live/playlist_2.m3u"), mode='rb').read()),
Mock(content=sentinel.data_4),
]
sleep_mock = Mock()
playlist_uri = 'http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/' \
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502.m3u8'
playlist = Playlist(playlist_uri, {'bandwidth': 500000}, None, None)
play_live(playlist, 'my_player "Custom Argument" --', _sleep=sleep_mock)
calls = [
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502.m3u8'),
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502/58877187.ts'),
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502/58877188.ts'),
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502/58877189.ts'),
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502.m3u8'),
call.get('http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/'
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502/58877190.ts'),
def test_run_player_live(self):
playlist_uri = 'http://example.cz/path/playlist.m3u8'
playlist = Playlist(playlist_uri, {'bandwidth': 500000}, None, None)
run_player(playlist, 'my_player "Custom Argument"')
self.assertEqual(self.call_mock.mock_calls, [call(['my_player', 'Custom Argument', playlist_uri])])
def test_get_playlist(self):
playlist_url = 'https://www.ceskatelevize.cz/ivysilani/client-playlist/?key=df365c9c2ea8b36f76dfa29e3b16d245'
with responses.RequestsMock() as rsps:
rsps.add(responses.POST, 'https://www.ceskatelevize.cz/ivysilani/ajax/get-client-playlist/',
json={'url': playlist_url}),
rsps.add(responses.GET, 'https://www.ceskatelevize.cz/ivysilani/client-playlist/',
body=get_content(get_path(__file__, 'data/play_live/client_playlist.json')))
rsps.add(responses.GET, 'http://80.188.65.18:80/cdn/uri/get/',
body=get_content(get_path(__file__, 'data/play_live/stream_playlist.m3u')))
playlist = get_playlist(sentinel.playlist_id, PLAYLIST_TYPE_CHANNEL, 0)
self.assertIsInstance(playlist, Playlist)
playlist_uri = 'http://80.188.78.151:80/atip/fd2eccaa99022586e14694df91068915/1449324471384/' \
'3616440c710a1d7e3f54761a6d940c64/2402-tv-pc/1502.m3u8'
self.assertEqual(playlist.uri, playlist_uri)
self.assertEqual(playlist.stream_info.bandwidth, 500000)
for segment in self.data.get('segments', []) ])
#self.keys = get_uniques([ segment.key for segment in self.segments ])
for attr, param in self.simple_attributes:
setattr(self, attr, self.data.get(param))
self.files = []
for key in self.keys:
# Avoid None key, it could be the first one, don't repeat them
if key and key.uri not in self.files:
self.files.append(key.uri)
self.files.extend(self.segments.uri)
self.media = MediaList([ Media(base_uri=self.base_uri, **media)
for media in self.data.get('media', []) ])
self.playlists = PlaylistList([ Playlist(base_uri=self.base_uri, media=self.media, **playlist)
for playlist in self.data.get('playlists', []) ])
self.iframe_playlists = PlaylistList()
for ifr_pl in self.data.get('iframe_playlists', []):
self.iframe_playlists.append(IFramePlaylist(base_uri=self.base_uri,
uri=ifr_pl['uri'],
iframe_stream_info=ifr_pl['iframe_stream_info'])
)
self.segment_map = self.data.get('segment_map')
start = self.data.get('start', None)
self.start = start and Start(**start)
server_control = self.data.get('server_control', None)
self.server_control = server_control and ServerControl(**server_control)