Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def stop(self):
if self.port > 0:
log.debug("Stopping the browser integration")
self.port = 0
self.tagger.listen_port_changed.emit(self.port)
self.close()
else:
log.debug("Browser integration inactive, no need to stop")
def load_album(self, album_id, discid=None):
album_id = self.mbid_redirects.get(album_id, album_id)
album = self.albums.get(album_id)
if album:
log.debug("Album %s already loaded.", album_id)
album.add_discid(discid)
return album
album = Album(album_id, discid=discid)
self.albums[album_id] = album
self.album_added.emit(album)
album.load()
return album
def _load(self, filename):
log.debug("Loading file %r", filename)
self.__casemap = {}
file = self._File(encode_filename(filename))
metadata = Metadata()
if file.tags:
for origname, values in file.tags.items():
name_lower = origname.lower()
if (values.kind == mutagen.apev2.BINARY
and name_lower.startswith("cover art")):
if b'\0' in values.value:
descr, data = values.value.split(b'\0', 1)
try:
coverartimage = TagCoverArtImage(
file=filename,
tag=name_lower,
data=data,
)
dirpath, filepaths = self._get_existing_paths(plugin_name, _FILEEXTS)
if dirpath:
if os.path.islink(dirpath):
log.debug("Removing symlink %r", dirpath)
os.remove(dirpath)
elif os.path.isdir(dirpath):
log.debug("Removing directory %r", dirpath)
shutil.rmtree(dirpath)
if filepaths:
for filepath in filepaths:
log.debug("Removing file %r", filepath)
os.remove(filepath)
if with_update:
update = filepath + _UPDATE_SUFFIX
if os.path.isfile(update):
log.debug("Removing file %r", update)
os.remove(update)
def _load_check(self, filename):
# Check that file has not been removed since thread was queued
# Don't load if we are stopping.
if self.state != File.PENDING:
log.debug("File not loaded because it was removed: %r", self.filename)
return None
if self.tagger.stopping:
log.debug("File not loaded because %s is stopping: %r", PICARD_APP_NAME, self.filename)
return None
return self._load(filename)
log.debug('WIKIDATA: Looking up cache for item: %s' % item_id)
log.debug('WIKIDATA: Album request count: %s' % album._requests)
log.debug('WIKIDATA: Item type %s' % item_type)
if item_id in self.cache:
log.debug('WIKIDATA: Found item in cache')
genre_list = self.cache[item_id]
new_genre = set(metadata.getall("genre"))
new_genre.update(genre_list)
#sort the new genre list so that they don't appear as new entries (not a change) next time
metadata["genre"] = self.genre_delimiter.join(sorted(new_genre))
return
else:
# pending requests are handled by adding the metadata object to a
# list of things to be updated when the genre is found
if item_id in self.itemAlbums:
log.debug(
'WIKIDATA: Request already pending, add it to the list of items to update once this has been'
'found')
self.requests[item_id].append(metadata)
else:
self.requests[item_id] = [metadata]
self.itemAlbums[item_id] = album
album._requests += 1
log.debug('WIKIDATA: First request for this item')
log.debug('WIKIDATA: About to call Musicbrainz to look up %s ' % item_id)
path = '/ws/2/%s/%s' % (item_type, item_id)
queryargs = {"inc": "url-rels"}
self.ws.get(self.mb_host, self.mb_port, path, partial(self.musicbrainz_release_lookup, item_id,
metadata),
# category in front of this one
# TODO this works only "downstream" eg from grouping to genre,
# not the other way round
if category.prepend:
log.info(
'%s: prepending from %s: %s',
category,
category.prepend,
', '.join([f'{t} ({s})' for t, s in overflow]) or 'None'
)
prepend_ = result[category.prepend]
result[category.name] = prepend_ + result[category.name]
# category is done; get metatag name for the category
metatag = category.get_metatag(scope)
log.debug('%s: metatag: %s', category, metatag)
# some categories aren't valid for all scopes (eg occasion in album)
if metatag is None:
log.debug('%s: no tag for scope %s', category, scope)
else:
value = join_tags(
result[category.name],
limit=category.limit,
separator=category.separator,
sort=category.sort,
apply_titlecase=category.titlecase
)
self.metadata[metatag] = value or settings.DEFAULT_UNKNOWN
log.info('%s: saving: %s = %s', category, metatag, self.metadata[metatag])
coverartimage.set_data(data)
if coverartimage.can_be_saved_to_metadata:
log.debug("Cover art image stored to metadata: %r [%s]" % (
coverartimage,
coverartimage.imageinfo_as_string())
)
self.metadata.images.append(coverartimage)
for track in self.album._new_tracks:
track.metadata.images.append(coverartimage)
# If the image already was a front image,
#Â there might still be some other non-CAA front
#Â images in the queue - ignore them.
if not self.front_image_found:
self.front_image_found = coverartimage.is_front_image()
else:
log.debug("Thumbnail for cover art image: %r [%s]" % (
coverartimage,
coverartimage.imageinfo_as_string())
)
except CoverArtImageIOError as e:
self.album.error_append(e)
self.album._finalize_loading(error=True)
raise e
except CoverArtImageIdentificationError as e:
self.album.error_append(e)
def _load(self, filename):
log.debug("Loading file %r", filename)
self.__casemap = {}
file = ASF(encode_filename(filename))
metadata = Metadata()
for name, values in file.tags.items():
if name == 'WM/Picture':
for image in values:
try:
(mime, data, type_, description) = unpack_image(image.value)
except ValueError as e:
log.warning('Cannot unpack image from %r: %s',
filename, e)
continue
try:
coverartimage = TagCoverArtImage(
file=filename,
tag=name,
def _remove_plugin_files(self, plugin_name, with_update=False):
plugin_name = strip_zip_suffix(plugin_name)
log.debug("Remove plugin files and dirs : %r", plugin_name)
dirpath, filepaths = self._get_existing_paths(plugin_name, _FILEEXTS)
if dirpath:
if os.path.islink(dirpath):
log.debug("Removing symlink %r", dirpath)
os.remove(dirpath)
elif os.path.isdir(dirpath):
log.debug("Removing directory %r", dirpath)
shutil.rmtree(dirpath)
if filepaths:
for filepath in filepaths:
log.debug("Removing file %r", filepath)
os.remove(filepath)
if with_update:
update = filepath + _UPDATE_SUFFIX
if os.path.isfile(update):
log.debug("Removing file %r", update)
os.remove(update)