Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_not_found_repo_packages(cls, pkg_lines: List[str]) -> List[str]:
not_found_pkg_names = []
pkg_names_to_check: List[str] = []
for pkg_name in pkg_lines:
if pkg_name in cls._pacman_repo_pkg_present_cache:
if not cls._pacman_repo_pkg_present_cache[pkg_name]:
not_found_pkg_names.append(pkg_name)
else:
pkg_names_to_check += pkg_name.split(',')
if not pkg_names_to_check:
return not_found_pkg_names
results = spawn(
get_pacman_command() + ['--sync', '--print-format=%%'] + pkg_names_to_check
).stderr_text.splitlines()
new_not_found_pkg_names = []
for result in results:
for pattern in (PATTERN_NOTFOUND, PATTERN_DB_NOTFOUND, ):
groups = pattern.findall(result)
if groups:
pkg_name = VersionMatcher(groups[0]).pkg_name
new_not_found_pkg_names.append(pkg_name)
for pkg_name in pkg_names_to_check:
cls._pacman_repo_pkg_present_cache[pkg_name] = pkg_name not in new_not_found_pkg_names
return not_found_pkg_names + new_not_found_pkg_names
def mkdir(to_path) -> None:
mkdir_result = spawn(isolate_root_cmd(['mkdir', '-p', to_path]))
if mkdir_result.returncode != 0:
print_stdout(mkdir_result.stdout_text)
print_stderr(mkdir_result.stderr_text)
raise Exception(_(f"Can't create destination directory '{to_path}'."))
def get_upgradeable_package_names() -> List[str]:
upgradeable_packages_output = spawn(
get_pacman_command() + ['--query', '--upgrades', '--quiet']
).stdout_text
if not upgradeable_packages_output:
return []
return upgradeable_packages_output.splitlines()
def _info_packages_thread_repo() -> str:
args = parse_args()
return spawn(get_pacman_command() + args.raw).stdout_text
def copy_aur_repo(from_path, to_path) -> None:
from_path = os.path.realpath(from_path)
to_path = os.path.realpath(to_path)
if not os.path.exists(to_path):
mkdir(to_path)
from_paths = []
for src_path in glob(f'{from_path}/*') + glob(f'{from_path}/.*'):
if os.path.basename(src_path) != '.git':
from_paths.append(src_path)
to_path = f'{to_path}/'
cmd_args = isolate_root_cmd(['cp', '-r'] + from_paths + [to_path])
result = spawn(cmd_args)
if result.returncode != 0:
if os.path.exists(to_path):
remove_dir(to_path)
mkdir(to_path)
result = interactive_spawn(cmd_args)
if result.returncode != 0:
raise Exception(_(f"Can't copy '{from_path}' to '{to_path}'."))
def create_dirs() -> None:
if running_as_root():
# Let systemd-run setup the directories and symlinks
true_cmd = isolate_root_cmd(['true'])
result = spawn(true_cmd)
if result.returncode != 0:
raise Exception(result)
# Chown the private CacheDirectory to root to signal systemd that
# it needs to recursively chown it to the correct user
os.chown(os.path.realpath(CACHE_ROOT), 0, 0)
if not os.path.exists(CACHE_ROOT):
os.makedirs(CACHE_ROOT)
migrate_old_aur_repos_dir()
if not os.path.exists(AUR_REPOS_CACHE_PATH):
os.makedirs(AUR_REPOS_CACHE_PATH)
def cli_print_version() -> None:
args = parse_args()
pacman_version = spawn(
[PikaurConfig().misc.PacmanPath.get_str(), '--version', ],
).stdout_text.splitlines()[1].strip(' .-')
print_version(pacman_version, quiet=args.quiet)
def get_pacman_test_output(cls, cmd_args: List[str]) -> List[VersionMatcher]:
if not cmd_args:
return []
cache_index = ' '.join(sorted(cmd_args))
cached_pkg = cls._pacman_test_cache.get(cache_index)
if cached_pkg is not None:
return cached_pkg
results: List[VersionMatcher] = []
not_found_packages_output = spawn(
# pacman --deptest flag conflicts with some --sync options:
get_pacman_command(ignore_args=[
'overwrite'
]) + ['--deptest', ] + cmd_args
).stdout_text
if not not_found_packages_output:
cls._pacman_test_cache[cache_index] = results
return results
for line in not_found_packages_output.splitlines():
try:
version_matcher = VersionMatcher(line)
except ValueError:
print_stderr(line)
continue
else:
results.append(version_matcher)