Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _move_artifact(self, source_file, dest_file):
if not os.path.exists(source_file):
# Sanity check for other plugins moving files
return
self._log.info(u'Moving artifact: {0}'.format(os.path.basename(dest_file.decode('utf8'))))
beets.util.move(source_file, dest_file)
dir_path = os.path.split(source_file)[0]
beets.util.prune_dirs(dir_path, clutter=config['clutter'].as_str_seq())
def remove_item(self, item):
path = self.get_path(item)
util.remove(path)
util.prune_dirs(path, root=self.directory)
del item[self.path_key]
import musicbrainzngs
import re
import traceback
from six.moves.urllib.parse import urljoin
from beets import logging
import beets.autotag.hooks
import beets
from beets import util
from beets import config
import six
VARIOUS_ARTISTS_ID = '89ad4ac3-39f7-470e-963a-56509c546377'
if util.SNI_SUPPORTED:
BASE_URL = 'https://musicbrainz.org/'
else:
BASE_URL = 'http://musicbrainz.org/'
SKIPPED_TRACKS = ['[data track]']
musicbrainzngs.set_useragent('beets', beets.__version__,
'http://beets.io/')
class MusicBrainzAPIError(util.HumanReadableException):
"""An error while talking to MusicBrainz. The `query` field is the
parameter to the action and may have any type.
"""
def __init__(self, reason, verb, query, tb=None):
self.query = query
subpath = unicodedata.normalize('NFD', subpath)
else:
subpath = unicodedata.normalize('NFC', subpath)
if beets.config['asciify_paths']:
subpath = util.asciify_path(
subpath,
beets.config['path_sep_replace'].as_str()
)
maxlen = beets.config['max_filename_length'].get(int)
if not maxlen:
# When zero, try to determine from filesystem.
maxlen = util.max_filename_length(self._db.directory)
subpath, fellback = util.legalize_path(
subpath, self._db.replacements, maxlen,
os.path.splitext(self.path)[1], fragment
)
if fellback:
# Print an error message if legalization fell back to
# default replacements because of the maximum length.
log.warning(
u'Fell back to default replacements when naming '
u'file {}. Configure replacements to avoid lengthening '
u'the filename.',
subpath
)
if fragment:
return util.as_string(subpath)
else:
"""
super(PathQuery, self).__init__(field, pattern, fast)
# By default, the case sensitivity depends on the filesystem
# that the query path is located on.
if case_sensitive is None:
path = util.bytestring_path(util.normpath(pattern))
case_sensitive = beets.util.case_sensitive(path)
self.case_sensitive = case_sensitive
# Use a normalized-case pattern for case-insensitive matches.
if not case_sensitive:
pattern = pattern.lower()
# Match the path as a single file.
self.file_path = util.bytestring_path(util.normpath(pattern))
# As a directory (prefix).
self.dir_path = util.bytestring_path(os.path.join(self.file_path, b''))
log.debug(u'moving album art {0} to {1}',
util.displayable_path(old_art),
util.displayable_path(new_art))
if copy:
util.copy(old_art, new_art)
elif link:
util.link(old_art, new_art)
elif hardlink:
util.hardlink(old_art, new_art)
else:
util.move(old_art, new_art)
self.artpath = new_art
# Prune old path when moving.
if not copy:
util.prune_dirs(os.path.dirname(old_art),
self._db.directory)
def _play_command(self, lib, opts, args):
"""The CLI command function for `beet play`. Create a list of paths
from query, determine if tracks or albums are to be played.
"""
use_folders = config['play']['use_folders'].get(bool)
relative_to = config['play']['relative_to'].get()
if relative_to:
relative_to = util.normpath(relative_to)
# Perform search by album and add folders rather than tracks to
# playlist.
if opts.album:
selection = lib.albums(ui.decargs(args))
paths = []
sort = lib.get_default_album_sort()
for album in selection:
if use_folders:
paths.append(album.item_dir())
else:
paths.extend(item.path
for item in sort.sort(album.items()))
item_type = 'album'
# Perform item query and add tracks to playlist.
def move_file(self, dest, operation=MoveOperation.MOVE):
"""Move, copy, link or hardlink the item's depending on `operation`,
updating the path value if the move succeeds.
If a file exists at `dest`, then it is slightly modified to be unique.
`operation` should be an instance of `util.MoveOperation`.
"""
if not util.samefile(self.path, dest):
dest = util.unique_path(dest)
if operation == MoveOperation.MOVE:
plugins.send("before_item_moved", item=self, source=self.path,
destination=dest)
util.move(self.path, dest)
plugins.send("item_moved", item=self, source=self.path,
destination=dest)
elif operation == MoveOperation.COPY:
util.copy(self.path, dest)
plugins.send("item_copied", item=self, source=self.path,
destination=dest)
elif operation == MoveOperation.LINK:
util.link(self.path, dest)
plugins.send("item_linked", item=self, source=self.path,
destination=dest)
elif operation == MoveOperation.HARDLINK:
util.hardlink(self.path, dest)
plugins.send("item_hardlinked", item=self, source=self.path,
destination=dest)
# Either copying or moving succeeded, so update the stored path.
def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return util.raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null
# Write tags from the database to the converted file.
item.try_write(path=converted, id3v23=id3v23)
if keep_new:
# If we're keeping the transcoded file, read it again (after
# writing) to get new bitrate, duration, etc.
item.path = converted
item.read()
item.store() # Store new path and audio data.
if self.config['embed'] and not linked:
album = item.get_album()
if album and album.artpath:
self._log.debug(u'embedding album art from {}',
util.displayable_path(album.artpath))
art.embed_item(self._log, item, album.artpath,
itempath=converted, id3v23=id3v23)
if keep_new:
plugins.send('after_convert', item=item,
dest=dest, keepnew=True)
else:
plugins.send('after_convert', item=item,
dest=converted, keepnew=False)