Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Handle copying of http:// and ftp:// debian repos.
"""
# warn about not having mirror program.
mirror_program = "/usr/bin/debmirror"
if not os.path.exists(mirror_program):
utils.die(self.logger, "no %s found, please install it" % (mirror_program))
cmd = "" # command to run
# detect cases that require special handling
if repo.rpm_list != "" and repo.rpm_list != []:
utils.die(self.logger, "has_rpm_list not yet supported on apt repos")
if not repo.arch:
utils.die(self.logger, "Architecture is required for apt repositories")
# built destination path for the repo
dest_path = os.path.join("/var/www/cobbler/repo_mirror", repo.name)
if repo.mirror_locally:
# NOTE: Dropping @@suite@@ replace as it is also dropped from
# from manage_import_debian_ubuntu.py due that repo has no os_version
# attribute. If it is added again it will break the Web UI!
# mirror = repo.mirror.replace("@@suite@@",repo.os_version)
mirror = repo.mirror
idx = mirror.find("://")
method = mirror[:idx]
def run_cmd(cmd):
"""
Run a command
@param cmd str command
@return str output
@raise Exception if return code is not 0
"""
print("run cmd: %s" % cmd)
args = shlex.split(cmd)
(output, rc) = utils.subprocess_sp(None, args, shell=False)
if rc != 0:
raise Exception
return output
if _DEBUG:
logger.debug("%s does not exist yet - retrying (try %s)" % (fn, count))
time.sleep(1)
mtime = os.stat(fn).st_mtime
key = None
needpush = True
if _DEBUG:
logger.debug("check_push(%s)" % fn)
if fn in db:
if db[fn][0] < mtime:
if _DEBUG:
logger.debug("mtime differ - old: %s new: %s" % (db[fn][0], mtime))
if os.path.exists(fn):
cmd = '/usr/bin/sha1sum %s'%fn
key = utils.subprocess_get(None,cmd).split(' ')[0]
if _DEBUG:
logger.debug("checking checksum - old: %s new: %s" % (db[fn][1], key))
if key == db[fn][1]:
needpush = False
else:
needpush = False
if key is None:
if os.path.exists(fn):
cmd = '/usr/bin/sha1sum %s'%fn
key = utils.subprocess_get(None,cmd).split(' ')[0]
if _DEBUG:
logger.debug("push(%s) ? %s" % (fn, needpush))
if needpush:
# reset the Result var
ProxySync.ResultLock.acquire()
if edit_type != "remove":
# FIXME: this doesn't know about interfaces yet!
# if object type is system and fields add to dict and then
# modify when done, rather than now.
imods = {}
# FIXME: needs to know about how to delete interfaces too!
for (k, v) in list(attributes.items()):
if object_type != "system" or not self.__is_interface_field(k):
# in place modifications allow for adding a key/value pair while keeping other k/v
# pairs intact.
if k in ["autoinstall_meta", "kernel_options", "kernel_options_post", "template_files", "boot_files", "fetchable_files", "params"] and \
"in_place" in attributes and attributes["in_place"]:
details = self.get_item(object_type, object_name)
v2 = details[k]
(ok, input) = utils.input_string_or_dict(v)
for (a, b) in list(input.items()):
if a.startswith("~") and len(a) > 1:
del v2[a[1:]]
else:
v2[a] = b
v = v2
self.modify_item(object_type, handle, k, v, token)
else:
modkey = "%s-%s" % (k, attributes.get("interface", ""))
imods[modkey] = v
if object_type == "system":
if "delete_interface" not in attributes and "rename_interface" not in attributes:
self.modify_system(handle, 'modify_interface', imods, token)
def run(api,args,logger):
settings = api.settings()
name = args[0] # name of distro
target = api.find_distro(name)
# collapse the object down to a rendered datastructure
# the second argument set to false means we don't collapse hashes/arrays into a flat string
target = utils.blender(api, False, target)
if target == {}:
raise CX("failure looking up target: %s" % target)
# Create metadata for the templar function
# Right now, just using img_path, but adding more
# cobbler variables here would probably be good
metadata = {}
metadata["img_path"] = os.path.join(utils.tftpboot_location(),"images",name)
# Create the templar instance
templater = templar.Templar()
# Loop through the hash of fetchable files,
# executing a cp for each one
for file in target["fetchable_files"].keys():
a distro snippet, and a general snippet. If no snippet is located, it
returns None.
"""
for snipclass in ('system', 'profile', 'distro'):
if self.varExists('%s_name' % snipclass):
fullpath = '%s/per_%s/%s/%s' % (self.getVar('autoinstall_snippets_dir'),
snipclass, file,
self.getVar('%s_name' % snipclass))
try:
contents = utils.read_file_contents(fullpath, fetch_if_remote=True)
return contents
except FileNotFoundException:
pass
try:
return "#errorCatcher ListErrors\n" + utils.read_file_contents('%s/%s' % (self.getVar('autoinstall_snippets_dir'), file), fetch_if_remote=True)
except FileNotFoundException:
return None
def get_valid_archs(self, token=None):
"""
Return the list of valid architectures as read
in from the distro signatures data
"""
self._log("get_valid_archs", token=token)
results = utils.get_valid_archs()
results.sort()
return self.xmlrpc_hacks(results)
def sync_dhcp(self):
restart_dhcp = str(self.settings.restart_dhcp).lower()
if restart_dhcp != "0":
rc = utils.subprocess_call(self.logger, "service dnsmasq restart")
if rc != 0:
error_msg = "service dnsmasq restart failed"
self.logger.error(error_msg)
raise CX(error_msg)
def get_file_lines(self, filename):
"""
Get lines from a file, which may or may not be compressed
"""
lines = []
ftype = utils.subprocess_get(self.logger, "/usr/bin/file %s" % filename)
if ftype.find("gzip") != -1:
try:
import gzip
f = gzip.open(filename, 'r')
lines = f.readlines()
f.close()
except:
pass
elif ftype.find("text") != -1:
f = open(filename, 'r')
lines = f.readlines()
f.close()
return lines