Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Install the given package. Returns True if successful, False otherwise.
"""
self.log.debug("install({})".format(name))
if self.check_package_flag(name, 'forceinstalled'):
self.log.debug("Package {} is assumed installed.".format(name))
# TODO maybe we can figure out a version string
return True
packagers = self.get_packagers(name)
if kwargs.get('static'):
self.log.debug('Package will be built statically.')
if not self.prefix_available:
self.log.error('Static builds require source builds.')
exit(1)
packagers = [self.src,]
r = recipe.get_recipe(name)
for pkgr in packagers:
self.log.debug("Trying to use packager {}".format(pkgr.name))
try:
install_result = pkgr.install(r)
except PBException as e:
self.log.error(
"Something went wrong while trying to install {} using {}: {}".format(
name, pkgr.name, str(e)
)
)
continue
if install_result:
return True
return False
def get_installed_version(self, command):
"""
Run command, see if it works. If the output has a version number in
x.y.z format, return that. If it doesn't, but the command ran, return
True. If it fails, return False. ezpz.
"""
try:
# If this fails, it almost always throws.
# NOTE: the split is to handle multi-argument commands. There's
# cases where this is not intended, e.g. it won't handle arguments
# with spaces! But currently this is preferable to running the
# command in a shell.
ver = subproc.match_output(
command.split(),
r'(?P[0-9]+\.[0-9]+(\.[0-9]+)?)',
'ver'
)
if ver is None:
self.log.debug("Could run, but couldn't find a version number.")
return True
self.log.debug("Found version number: {0}".format(ver))
return ver
except (subprocess.CalledProcessError, OSError):
# We'll assume it's not installed
return False
except Exception as e:
self.log.error("Running `{0}` failed.".format(command))
self.log.trace(str(e))
return False
def remove_recipe_dir(self, alias):
"""
Remove a recipe alias and, if applicable, its cache.
"""
if alias not in self.cfg.get_named_recipe_dirs():
self.log.error("Unknown recipe alias: {alias}".format(alias=alias))
return False
# Remove from config file
cfg_filename = self.cfg.get_named_recipe_cfg_file(alias)
cfg_file = PBConfigFile(cfg_filename)
cfg_data = cfg_file.get()
cfg_data['recipes'].pop(alias, None)
cfg_file.save(cfg_data)
recipe_cache_dir = os.path.join(
os.path.split(cfg_filename)[0],
self.cfg.recipe_cache_dir,
alias,
)
# If the recipe cache is not inside a PyBOMBS dir, don't delete it.
if self.cfg.pybombs_dir not in recipe_cache_dir:
return True
if os.path.exists(recipe_cache_dir):
self.log.info("Removing directory: {cdir}".format(cdir=recipe_cache_dir))
shutil.rmtree(recipe_cache_dir)
return True
for r_loc in args.recipes:
if r_loc:
self._recipe_locations.append(npath(r_loc))
# From environment variable:
if os.environ.get("PYBOMBS_RECIPE_DIR", "").strip():
self._recipe_locations += [
npath(x) \
for x in os.environ.get("PYBOMBS_RECIPE_DIR").split(os.pathsep) \
if len(x.strip())
]
# From prefix info:
if self._prefix_info.recipe_dir is not None:
self._recipe_locations.append(self._prefix_info.recipe_dir)
# From config files (from here, recipe locations are named):
for cfg_file in cfg_files:
recipe_locations = PBConfigFile(cfg_file).get('recipes')
for name, uri in reversed(recipe_locations.items()):
local_recipe_dir = self.resolve_recipe_uri(
uri, name, os.path.join(os.path.split(cfg_file)[0], 'recipes')
)
self._recipe_locations.append(local_recipe_dir)
self._named_recipe_dirs[name] = local_recipe_dir
self._named_recipe_sources[name] = uri
self._named_recipe_cfg_files[name] = cfg_file
# Internal recipe list:
self._recipe_locations.append(os.path.join(self.module_dir, 'recipes'))
self.log.debug("Full list of recipe locations: {0}".format(self._recipe_locations))
self.log.debug("Named recipe locations: {0}".format(self._named_recipe_sources))
# Detect Python version of the prefix (this is *not* necessarily
# the Python version that is used for executing this script! The
# prefix could be, for example, a virtualenv with its custom Python
# version.)
[self.fastcommand, "-q",
'--qf=%{NAME}\ %{VERSION}-%{RELEASE}\ %{ARCH}',
pkgname]).strip().split()
if pkg == pkgname and (pkgarch is None or arch == pkgarch):
self.log.debug("Package {0} has version {1} in {2}".format(pkgname, ver, self.fastcommand))
return ver
# exception for unpack error if package not found
except subproc.CalledProcessError:
pass
except Exception as ex:
self.log.error("Parsing `{0} list installed` failed.".format(self.fastcommand))
self.log.trace(str(ex))
return False
try:
# 'list installed' will return non-zero if package does not exist, thus will throw
out = subproc.check_output(
[self.command, "search", "-is", pkgname],
stderr=subprocess.STDOUT
).strip().split("\n")
# Output looks like this:
# |||||
# So, two steps:
# 1) Check that pkgname is correct (and, if necessary, the package arch)
# 2) return version
match_pat = r"^(?P)\s+\|\s+(?P[^\.]+)\s+\| \
\s+(?P\S+)\s+\| \
\s+(?P[0-9]+([.-][0-9a-z]+))\s+\| \
\s+(?P\S+)\s+(\d+:)"
matcher = re.compile(match_pat)
for line in out:
mobj = matcher.match(line)
if mobj and mobj.group('pkg') == pkgname:
def load_install_cache(self):
"""
Populate the installed cache.
"""
global PIP_INSTALLED_CACHE
self.log.debug("Loading pip install cache.")
PIP_INSTALLED_CACHE = {}
try:
installed_packages = \
str(subproc.check_output([self.cmd, "list"])).strip().split("\n")
for pkg in installed_packages:
mobj = re.match(r'(?P\S+)\s+\((?P[^)]+)\)', str(pkg))
if mobj is None:
continue
PIP_INSTALLED_CACHE[mobj.group('pkg')] = mobj.group('ver')
return
except subproc.CalledProcessError as e:
self.log.error("Could not run %s list. Hm.", self.cmd)
self.log.error(str(e))
except Exception as e:
self.log.error("Some error while running %s list.", self.cmd)
self.log.error(str(e))
Run setup_env_file, return the new env
FIXME make this portable!
"""
self.log.debug('Loading environment from shell script: {0}'.format(setup_env_file))
# It would be nice if we could do os.path.expandvars() with a custom
# env, wouldn't it
setup_env_file = setup_env_file.replace('${0}'.format(self.env_prefix_var), self.prefix_dir)
setup_env_file = setup_env_file.replace('${{{0}}}'.format(self.env_prefix_var), self.prefix_dir)
# TODO add some checks this is a legit script
# Damn, I hate just running stuff :/
# TODO unportable command:
separator = '<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>'
get_env_cmd = "source {env_file} && echo '{sep}' && env".format(env_file=setup_env_file, sep=separator)
from pybombs.utils import subproc
try:
script_output = subproc.check_output(get_env_cmd, shell=True)
except subproc.CalledProcessError:
self.log.error("Trouble sourcing file {env_file}".format(env_file=setup_env_file))
raise PBException("Could not source env file.")
env_output = script_output.split(separator)[-1]
# TODO assumption is that env_output now just holds the env output
env_output = env_output.split('\n')
env = {}
for env_line in env_output:
env_line = env_line.strip()
if len(env_line) == 0:
continue
k, v = env_line.split('=', 1)
env[k] = v
return env
# Return False (0) on success, as it gets sent to exit()
return not self.install_manager.install(
self.args.packages,
mode=self.cmd,
fail_if_not_exists=self.fail_if_not_exists,
update_if_exists=self.update_if_exists,
quiet=False,
print_tree=self.args.print_tree,
deps_only=getattr(self.args, 'deps_only', False),
no_deps=self.args.no_deps,
verify=self.args.verify,
static=getattr(self.args, 'static', False),
)
### Damn, you found it :)
class Doge(CommandBase):
""" Secret woofy component of PyBOMBS """
cmds = {
'doge': 'Doge',
}
hidden = True
def __init__(self, cmd, args):
CommandBase.__init__(self,
cmd, args,
load_recipes=False,
require_prefix=False,
)
def run(self):
""" Woof, woof, woof! """
print(r" ▄ ▄ ")
def __init__(self,):
# Set up logger:
self.log = pb_logging.logger.getChild("PackageManager")
self.cfg = config_manager
self.prefix_available = self.cfg.get_active_prefix().prefix_dir is not None
# Create a source package manager
if self.prefix_available:
self.src = packagers.Source()
self.prefix = self.cfg.get_active_prefix()
else:
self.log.debug("No prefix specified. Skipping source package manager.")
self.src = None
# Create sorted list of binary package managers
requested_packagers = [x.strip() for x in self.cfg.get('packagers').split(',')]
binary_pkgrs = []
for pkgr in requested_packagers:
self.log.debug("Attempting to add binary package manager {}".format(pkgr))
p = packagers.get_by_name(pkgr, packagers.__dict__.values())
if p is None:
self.log.warn("This binary package manager can't be instantiated: {}".format(pkgr))
def extract_to(filename, path):
"""
Extract an archive into a directory. Return the prefix for the extracted files.
"""
log = pb_logging.logger.getChild("extract_to")
if tarfile.is_tarfile(filename):
archive = tarfile.open(filename)
names = archive.getnames()
elif zipfile.is_zipfile(filename):
archive = zipfile.ZipFile(filename)
names = archive.namelist()
else:
raise RuntimeError("Cannot extract {}: Unknown archive type")
log.debug("Unpacking {0}".format(filename))
if len(names) == 1:
prefix = os.path.split(archive.getnames()[0])[0]
else:
prefix = os.path.commonprefix(archive.getnames())
if not prefix:
prefix = '.'
log.debug("Common prefix: {0}".format(prefix))