Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _extract_from_json_url(self, json_url, video_id, lang, title=None):
info = self._download_json(json_url, video_id)
player_info = info['videoJsonPlayer']
vsr = try_get(player_info, lambda x: x['VSR'], dict)
if not vsr:
error = None
if try_get(player_info, lambda x: x['custom_msg']['type']) == 'error':
error = try_get(
player_info, lambda x: x['custom_msg']['msg'], compat_str)
if not error:
error = 'Video %s is not available' % player_info.get('VID') or video_id
raise ExtractorError(error, expected=True)
upload_date_str = player_info.get('shootingDate')
if not upload_date_str:
upload_date_str = (player_info.get('VRA') or player_info.get('VDA') or '').split(' ')[0]
title = (player_info.get('VTI') or title or player_info['VID']).strip()
subtitle = player_info.get('VSU', '').strip()
if subtitle:
title += ' - %s' % subtitle
info_dict = {
'id': player_info['VID'],
'title': title,
def str_or_none(v, default=None):
return default if v is None else compat_str(v)
}
self.add_default_extra_info(ie_result, ie, url)
if process:
return self.process_ie_result(ie_result, download, extra_info)
else:
return ie_result
except GeoRestrictedError as e:
msg = e.msg
if e.countries:
msg += '\nThis video is available in %s.' % ', '.join(
map(ISO3166Utils.short2full, e.countries))
msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
self.report_error(msg)
break
except ExtractorError as e: # An error we somewhat expected
self.report_error(compat_str(e), e.format_traceback())
break
except MaxDownloadsReached:
raise
except Exception as e:
if self.params.get('ignoreerrors', False):
self.report_error(error_to_compat_str(e), tb=encode_compat_str(traceback.format_exc()))
break
else:
raise
else:
self.report_error('no suitable InfoExtractor for URL %s' % url)
# Get a appKey/uuid for getting the session key
appKey = self._search_regex(
r'constant\s*\(\s*["\']_appGridApplicationKey["\']\s*,\s*["\']([0-9a-f]+)',
gigya_sc, 'appKey')
session_json = self._download_json(
'https://appgrid-api.cloud.accedo.tv/session',
video_id, 'Downloading session keys', query={
'appKey': appKey,
'uuid': compat_str(uuid.uuid4()),
})
paths = self._download_json(
'https://appgrid-api.cloud.accedo.tv/metadata/general_configuration,%20web_configuration',
video_id, 'Downloading paths JSON',
query={'sessionKey': compat_str(session_json['sessionKey'])})
ooyala_s = paths['general_configuration']['api_configuration']['ooyala_search']
source = self._download_json(
'http://%s%s%s/docs/%s' % (
ooyala_s['base_url'], ooyala_s['full_path'],
ooyala_s['provider_id'], video_id),
video_id, 'Downloading data JSON', query={
'include_titles': 'Series,Season',
'product_name': 'test',
'format': 'full',
})['hits']['hits'][0]['_source']
embedCode = source['offers'][0]['embed_codes'][0]
titles = source['localizable_titles'][0]
title = titles.get('title_medium') or titles['title_long']
def _extract_video_info(self, video_info):
video_id = compat_str(video_info['id'])
publisher_id = video_info.get('publisherId')
info = {
'id': video_id,
'title': video_info['displayName'].strip(),
'description': video_info.get('shortDescription'),
'thumbnail': video_info.get('videoStillURL') or video_info.get('thumbnailURL'),
'uploader': video_info.get('publisherName'),
'uploader_id': compat_str(publisher_id) if publisher_id else None,
'duration': float_or_none(video_info.get('length'), 1000),
'timestamp': int_or_none(video_info.get('creationDate'), 1000),
}
renditions = video_info.get('renditions', []) + video_info.get('IOSRenditions', [])
if renditions:
formats = []
for rend in renditions:
url = rend['defaultURL']
if not url:
continue
ext = None
if rend['remote']:
url_comp = compat_urllib_parse_urlparse(url)
if url_comp.path.endswith('.m3u8'):
formats.extend(
headers['Referer'] = url
# Extract ID from URL
mobj = re.match(self._VALID_URL, url)
video_id = mobj.group('id')
orig_url = url
if mobj.group('pro') or mobj.group('player'):
url = 'https://player.vimeo.com/video/' + video_id
elif any(p in url for p in ('play_redirect_hls', 'moogaloop.swf')):
url = 'https://vimeo.com/' + video_id
# Retrieve video webpage to extract further information
request = sanitized_Request(url, headers=headers)
try:
webpage, urlh = self._download_webpage_handle(request, video_id)
redirect_url = compat_str(urlh.geturl())
# Some URLs redirect to ondemand can't be extracted with
# this extractor right away thus should be passed through
# ondemand extractor (e.g. https://vimeo.com/73445910)
if VimeoOndemandIE.suitable(redirect_url):
return self.url_result(redirect_url, VimeoOndemandIE.ie_key())
except ExtractorError as ee:
if isinstance(ee.cause, compat_HTTPError) and ee.cause.code == 403:
errmsg = ee.cause.read()
if b'Because of its privacy settings, this video cannot be played here' in errmsg:
raise ExtractorError(
'Cannot download embed-only video without embedding '
'URL. Please call youtube-dl with the URL of the page '
'that embeds this video.',
expected=True)
raise
def sanitize_string_field(info, string_field):
field = info.get(string_field)
if field is None or isinstance(field, compat_str):
return
report_force_conversion(string_field, 'a string', 'string')
info[string_field] = compat_str(field)
playlist = self._parse_json(
self._search_regex(
r'window\.pagePlaylist\s*=\s*(\[.+?\]);\n', webpage, 'page playlist'),
video_id)
def _mrss_url(item):
return item['mrss'] + item.get('mrssvars', '')
# news pages contain single video in playlist with different id
if len(playlist) == 1:
return self._get_videos_info_from_url(_mrss_url(playlist[0]), video_id)
for item in playlist:
item_id = item.get('id')
if item_id and compat_str(item_id) == video_id:
return self._get_videos_info_from_url(_mrss_url(item), video_id)
except IndexError:
next_fragment_time = duration
fragment_ctx['duration'] = (next_fragment_time - fragment_ctx['time']) / fragment_repeat
for _ in range(fragment_repeat):
fragments.append({
'url': re.sub(r'{start[ _]time}', compat_str(fragment_ctx['time']), track_url_pattern),
'duration': fragment_ctx['duration'] / stream_timescale,
})
fragment_ctx['time'] += fragment_ctx['duration']
format_id = []
if ism_id:
format_id.append(ism_id)
if stream_name:
format_id.append(stream_name)
format_id.append(compat_str(tbr))
formats.append({
'format_id': '-'.join(format_id),
'url': ism_url,
'manifest_url': ism_url,
'ext': 'ismv' if stream_type == 'video' else 'isma',
'width': width,
'height': height,
'tbr': tbr,
'asr': sampling_rate,
'vcodec': 'none' if stream_type == 'audio' else fourcc,
'acodec': 'none' if stream_type == 'video' else fourcc,
'protocol': 'ism',
'fragments': fragments,
'_download_params': {
'duration': duration,
_, date_str = extract_timezone(date_str)
for expression in date_formats(day_first):
try:
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
except ValueError:
pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
try:
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
except ValueError:
pass
if upload_date is not None:
return compat_str(upload_date)