Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
reports = Report.objects.filter(host=host).order_by('-created')[:3]
report_ids = []
for report in reports:
report_ids.append(report.id)
report.accessed = timestamp
report.save()
del_reports = Report.objects.filter(host=host).exclude(id__in=report_ids)
rlen = del_reports.count()
ptext = 'Cleaning {0!s} old reports'.format(rlen)
progress_info_s.send(sender=None, ptext=ptext, plen=rlen)
for i, report in enumerate(del_reports):
report.delete()
progress_update_s.send(sender=None, index=i + 1)
extracted = extract(data, url)
ns = 'http://linux.duke.edu/metadata/common'
m_context = etree.iterparse(BytesIO(extracted),
tag='{{{0!s}}}metadata'.format(ns))
plen = int(next(m_context)[1].get('packages'))
p_context = etree.iterparse(BytesIO(extracted),
tag='{{{0!s}}}package'.format(ns))
packages = set()
if plen > 0:
ptext = 'Extracting packages: '
progress_info_s.send(sender=None, ptext=ptext, plen=plen)
for i, p_data in enumerate(p_context):
elem = p_data[1]
progress_update_s.send(sender=None, index=i + 1)
name = elem.xpath('//ns:name',
namespaces={'ns': ns})[0].text.lower()
arch = elem.xpath('//ns:arch',
namespaces={'ns': ns})[0].text
fullversion = elem.xpath('//ns:version',
namespaces={'ns': ns})[0]
epoch = fullversion.get('epoch')
version = fullversion.get('ver')
release = fullversion.get('rel')
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
if name != '' and version != '' and arch != '':
if epoch == '0':
epoch = ''
def add_updates(updates, host):
""" Add updates to a Host
"""
ulen = len(updates)
if ulen > 0:
ptext = '{0!s} updates'.format(str(host)[0:25])
progress_info_s.send(sender=None, ptext=ptext, plen=ulen)
for i, (u, sec) in enumerate(updates.items):
update = process_update(host, u, sec)
if update:
host.updates.add(update)
progress_update_s.send(sender=None, index=i + 1)
for i, pkg_str in enumerate(packages):
package = process_package(pkg_str, report.protocol)
if package:
package_ids.append(package.id)
try:
with transaction.atomic():
host.packages.add(package)
except IntegrityError as e:
error_message.send(sender=None, text=e)
except DatabaseError as e:
error_message.send(sender=None, text=e)
else:
if pkg_str[0].lower() != 'gpg-pubkey':
text = 'No package returned for {0!s}'.format(pkg_str)
info_message.send(sender=None, text=text)
progress_update_s.send(sender=None, index=i + 1)
for package in host.packages.all():
if package.id not in package_ids:
host.packages.remove(package)
def mark_security_updates():
""" For each set of erratum packages, modify any PackageUpdate that
should be marked as a security update.
"""
package_updates = PackageUpdate.objects.all()
errata = Erratum.objects.all()
elen = Erratum.objects.count()
ptext = 'Scanning {0!s} Errata:'.format(elen)
progress_info_s.send(sender=None, ptext=ptext, plen=elen)
for i, erratum in enumerate(errata):
progress_update_s.send(sender=None, index=i + 1)
if erratum.etype == 'security':
for package in erratum.packages.all():
with transaction.atomic():
affected_updates = package_updates.select_for_update(
).filter(newpackage=package)
for affected_update in affected_updates:
affected_update.security = True
affected_update.save()
def update_mirror_packages(mirror, packages):
""" Updates the packages contained on a mirror, and
removes obsolete packages.
"""
new = set()
old = set()
removals = set()
mirror_packages = mirror.packages.all()
mlen = mirror_packages.count()
ptext = 'Obtaining stored packages: '
progress_info_s.send(sender=None, ptext=ptext, plen=mlen)
for i, package in enumerate(mirror_packages):
progress_update_s.send(sender=None, index=i + 1)
name = str(package.name)
arch = str(package.arch)
strpackage = PackageString(name=name,
epoch=package.epoch,
version=package.version,
release=package.release,
arch=arch,
packagetype=package.packagetype)
old.add(strpackage)
new = packages.difference(old)
removals = old.difference(packages)
nlen = len(new)
rlen = len(removals)
@receiver(progress_update_s)
def progress_update_r(**kwargs):
""" Receiver to update a progressbar
"""
index = kwargs.get('index')
if index:
update_pbar(index)