Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def import_installed_packages(conn):
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute('SELECT * FROM installed_pkgs')
rows = cursor.fetchall()
plength = len(rows)
pbar = progress_bar('Installed Packages', plength)
i = 0
for row in rows:
pbar.update(i+1)
i += 1
try:
host = Host.objects.get(id=row['host_id'])
except Host.DoesNotExist:
continue
arch, c = PackageArchitecture.objects.get_or_create(name=row['arch'])
package, c = Package.objects.get_or_create(name=PackageName.objects.get(id=row['pkg_id']), version=row['version'], release=row['rel'], arch=arch)
host.packages.add(package)
cursor.close()
def clean_tags():
""" Delete unused tags
"""
tagged_items = list(TaggedItem.objects.all())
to_delete = []
for t in tagged_items:
hostid = t.object_id
try:
# tags are only used for hosts for now
Host.objects.get(pk=hostid)
except ObjectDoesNotExist:
to_delete.append(t)
tlen = len(to_delete)
if tlen != 0:
create_pbar('Removing {0!s} unused tagged items'.format(tlen), tlen)
for i, t in enumerate(to_delete):
t.delete()
update_pbar(i + 1)
def get_host(host=None, action='Performing action'):
""" Helper function to get a single host object
"""
host_obj = None
hostdot = host + '.'
message = '{0!s} for host {1!s}'.format(action, host)
try:
host_obj = Host.objects.get(hostname__startswith=hostdot)
except ObjectDoesNotExist:
try:
host_obj = Host.objects.get(hostname__startswith=host)
except ObjectDoesNotExist:
message = 'Host {0!s} does not exist'.format(host)
except MultipleObjectsReturned:
matches = Host.objects.filter(hostname__startswith=host).count()
message = '{0!s} hosts match hostname "{1!s}"'.format(matches, host)
if verbose:
print message
return host_obj