Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def extract_yast_packages(data):
""" Extract package metadata from yast metadata file
"""
extracted = extract(data, 'gz')
pkgs = re.findall(b'=Pkg: (.*)', extracted)
plen = len(pkgs)
packages = set()
if plen > 0:
ptext = 'Extracting packages: '
progress_info_s.send(sender=None, ptext=ptext, plen=plen)
for i, pkg in enumerate(pkgs):
progress_update_s.send(sender=None, index=i + 1)
name, version, release, arch = str(pkg).split()
package = PackageString(name=name.lower(),
epoch='',
version=version,
release=release,
arch=arch,
packagetype='R')
packages.add(package)
else:
info_message.send(sender=None, text='No packages found in repo')
return packages
def add_updates(updates, host):
""" Add updates to a Host
"""
ulen = len(updates)
if ulen > 0:
ptext = '{0!s} updates'.format(str(host)[0:25])
progress_info_s.send(sender=None, ptext=ptext, plen=ulen)
for i, (u, sec) in enumerate(updates.items):
update = process_update(host, u, sec)
if update:
host.updates.add(update)
progress_update_s.send(sender=None, index=i + 1)
def extract_arch_packages(data):
""" Extract package metadata from an arch linux tarfile
"""
from packages.utils import find_evr
extracted = BytesIO(extract(data, 'gz'))
tf = tarfile.open(fileobj=extracted, mode='r:*')
packages = set()
plen = len(tf.getnames())
if plen > 0:
ptext = 'Extracting packages: '
progress_info_s.send(sender=None, ptext=ptext, plen=plen)
for i, tarinfo in enumerate(tf):
progress_update_s.send(sender=None, index=i + 1)
if tarinfo.isfile():
name_sec = ver_sec = arch_sec = False
t = tf.extractfile(tarinfo).read()
for line in t.decode('utf-8').splitlines():
if line.startswith('%NAME%'):
name_sec = True
continue
if name_sec:
name_sec = False
name = line
continue
if line.startswith('%VERSION%'):
ver_sec = True
continue
def extract_deb_packages(data, url):
""" Extract package metadata from debian Packages file
"""
extracted = extract(data, url)
package_re = re.compile(b'^Package: ', re.M)
plen = len(package_re.findall(extracted))
packages = set()
if plen > 0:
ptext = 'Extracting packages: '
progress_info_s.send(sender=None, ptext=ptext, plen=plen)
bio = BytesIO(extracted)
for i, stanza in enumerate(Packages.iter_paragraphs(bio)):
# https://github.com/furlongm/patchman/issues/55
if 'version' not in stanza:
continue
fullversion = Version(stanza['version'])
arch = stanza['architecture']
name = stanza['package']
epoch = fullversion._BaseVersion__epoch
if epoch is None:
epoch = ''
version = fullversion._BaseVersion__upstream_version
release = fullversion._BaseVersion__debian_revision
if release is None:
release = ''
strpackage = PackageString(name=name,
epoch=package.epoch,
version=package.version,
release=package.release,
arch=arch,
packagetype=package.packagetype)
old.add(strpackage)
new = packages.difference(old)
removals = old.difference(packages)
nlen = len(new)
rlen = len(removals)
ptext = 'Removing {0!s} obsolete packages:'.format(rlen)
progress_info_s.send(sender=None, ptext=ptext, plen=rlen)
for i, package in enumerate(removals):
progress_update_s.send(sender=None, index=i + 1)
package_id = PackageName.objects.get(name=package.name)
epoch = package.epoch
version = package.version
release = package.release
arch = PackageArchitecture.objects.get(name=package.arch)
packagetype = package.packagetype
p = Package.objects.get(name=package_id,
epoch=epoch,
version=version,
arch=arch,
release=release,
packagetype=packagetype)
from repos.models import MirrorPackage
with transaction.atomic():