Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _log_tags(self, tags, message, limit=8):
log.info('%s: %s tag(s) %s:', self, len(tags), message)
log.info(
'%s: %s%s',
self,
', '.join(['{} ({})'.format(t, s) for t, s in tags][:limit]),
', ...' if len(tags) > limit else '',
)
def print_toplist(merged):
def p(score):
return int(float(score) / float(topscore) * 100.0)
try:
topscore = merged[0][1]
toplist = [
"{0}: {1} ({2}%)".format(n, s, p(s))
for n, s in
merged[:10]
]
log.info(", ".join(toplist))
except Exception:
log.info("None")
def process_track_tags(self):
"""
this is called after all last.fm data is received to process the
collected data for track tags.
"""
log.info(
'>>> process track tags: %s - %s',
self.metadata['tracknumber'], self.metadata['title'],
)
if settings.DEBUG_STATS_TRACK:
self.print_toptag_stats('track', 'track')
self.print_toptag_stats('track', 'artist')
# get complete, balanced, sorted list (high first) of tags
all_tags = apply_tag_weight(
(self.toptags['artist'], settings.CONFIG['track']['weight']['artist']),
(self.toptags['track'], settings.CONFIG['track']['weight']['track'])
)
self.filter_and_set_metadata('track', all_tags)
else:
metadata["genre"] = sorted(new_genre)
log.debug('WIKIDATA: setting cache genre to : %s ' % genre_list)
self.cache[item_id] = genre_list
else:
log.debug('WIKIDATA: genre not found in wikidata')
log.debug('WIKIDATA: seeing if we can finalize tags...')
album = self.itemAlbums[item_id]
album._requests -= 1
if not album._requests:
self.itemAlbums = {k: v for k, v in self.itemAlbums.items() if v != album}
album._finalize_loading(None)
log.info('WIKIDATA: total remaining requests: %s' % album._requests)
if not self.itemAlbums:
self.requests.clear()
log.info('WIKIDATA: Finished (B)')
# category is done; get metatag name for the category
metatag = category.get_metatag(scope)
log.debug('%s: metatag: %s', category, metatag)
# some categories aren't valid for all scopes (eg occasion in album)
if metatag is None:
log.debug('%s: no tag for scope %s', category, scope)
else:
value = join_tags(
result[category.name],
limit=category.limit,
separator=category.separator,
sort=category.sort,
apply_titlecase=category.titlecase
)
self.metadata[metatag] = value or settings.DEFAULT_UNKNOWN
log.info('%s: saving: %s = %s', category, metatag, self.metadata[metatag])
def process_album_tags(self):
"""
this is called after all last.fm data is received to process the
collected data for album tags.
"""
log.info(
'>>> process album tags: %s - %s',
self.metadata['albumartist'], self.metadata['album'],
)
if settings.DEBUG_STATS_ALBUM:
self.print_toptag_stats('album', 'album', len(self.tracks))
self.print_toptag_stats('album', 'all_artist')
self.print_toptag_stats('album', 'all_track')
# get complete, balanced, sorted list (high first) of tags:
album_weight = settings.CONFIG['album']['weight']['album'] * len(self.tracks)
all_track_weight = settings.CONFIG['album']['weight']['all_track']
all_artist_weight = settings.CONFIG['album']['weight']['all_artist']
# album tag score gets multiplied by the total number of tracks
# in the release to even out weight of all_* tags before merging
all_tags = apply_tag_weight(
(self.toptags['album'], album_weight),
def load(self, priority=False, refresh=False):
if self._requests:
log.info("Not reloading, some requests are still active.")
return
self.tagger.window.set_statusbar_message(
N_('Loading album %(id)s ...'),
{'id': self.id}
)
self.loaded = False
self.status = _("[loading album information]")
if self.release_group:
self.release_group.loaded = False
self.release_group.folksonomy_tags.clear()
self.metadata.clear()
self.folksonomy_tags.clear()
self.update()
self._new_metadata = Metadata()
self._new_tracks = []
self._requests = 1
def dispatch(self, tagtype, params):
"""
Implements the caching mechanism.
Lookup from cache or dispatch a new api request.
"""
query = urlencode(params, quote_via=quote, safe="/',!@:$")
log.info('cache key for reading: %s', query)
# if the query is already cached only queue task
if query in CACHE:
log.debug('cached %s', query)
self.add_task(partial(self.handle_cached_toptags, tagtype, query))
# queries in the PENDING list are already queued, queue them like
# cache tasks. by the time they will be processed, the actual query
# will have stored data in the cache
elif query in PENDING:
log.debug('pending %s', query)
self.add_task(partial(self.handle_cached_toptags, tagtype, query))
# new queries are queued as http-requests
else:
log.debug('request %s', query)
self.add_request(partial(self.handle_toptags, tagtype), params)
if not category.is_enabled:
continue
filtered_tags = category.filter_tags(all_tags)
# use extend, because of how overflow works,
# directly writing to results
result[category.name].extend(filtered_tags[:category.limit])
overflow = filtered_tags[category.limit:]
# if an overflow is configured, put the toptags, that exceed the
# limit in the category configured for overflow
if category.overflow and overflow:
# the overflowed toptags are not considered in the threshold
# calculation of that category, they are put directly into
# the result list.
log.info(
'%s: overflow to %s: %s',
category,
category.overflow,
', '.join([f'{t} ({s})' for t, s in overflow]) or 'None'
)
if overflow:
result[category.overflow] = overflow
# if a prepend-category is configured copy the tags from that
# category in front of this one
# TODO this works only "downstream" eg from grouping to genre,
# not the other way round
if category.prepend:
log.info(
'%s: prepending from %s: %s',
category,
def process_release(self, album, metadata, release):
self.ws = album.tagger.webservice
self.log = album.log
item_id = metadata.getall('musicbrainz_releasegroupid')[0]
log.info('WIKIDATA: Processing release group %s ' % item_id)
self.process_request(metadata, album, item_id, item_type='release-group')
for artist in metadata.getall('musicbrainz_albumartistid'):
item_id = artist
log.info('WIKIDATA: Processing release artist %s' % item_id)
self.process_request(metadata, album, item_id, item_type='artist')