Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.remote_names = {}
self.remote_dict = {}
self.must_include = {
"distro": {},
"profile": {},
"system": {},
"image": {},
"repo": {},
"mgmtclass": {},
"package": {},
"file": {}
}
for ot in OBJ_TYPES:
self.remote_names[ot] = list(utils.lod_to_dod(self.remote_data[ot], "name").keys())
self.remote_dict[ot] = utils.lod_to_dod(self.remote_data[ot], "name")
if self.sync_all:
for names in self.remote_dict[ot]:
self.must_include[ot][names] = 1
self.logger.debug("remote names struct is %s" % self.remote_names)
if not self.sync_all:
# include all profiles that are matched by a pattern
for obj_type in OBJ_TYPES:
patvar = getattr(self, "%s_patterns" % obj_type)
self.logger.debug("* Finding Explicit %s Matches" % obj_type)
for pat in patvar:
for remote in self.remote_names[obj_type]:
self.logger.debug("?: seeing if %s looks like %s" % (remote, pat))
if fnmatch.fnmatch(remote, pat):
def remove_objects_not_on_master(self, obj_type):
locals = utils.lod_to_dod(self.local_data[obj_type], "uid")
remotes = utils.lod_to_dod(self.remote_data[obj_type], "uid")
for (luid, ldata) in list(locals.items()):
if luid not in remotes:
try:
self.logger.info("removing %s %s" % (obj_type, ldata["name"]))
self.api.remove_item(obj_type, ldata["name"], recursive=True, logger=self.logger)
except Exception:
utils.log_exc(self.logger)
def add_objects_not_on_local(self, obj_type):
locals = utils.lod_to_dod(self.local_data[obj_type], "uid")
remotes = utils.lod_sort_by_key(self.remote_data[obj_type], "depth")
for rdata in remotes:
# do not add the system if it is not on the transfer list
if not rdata["name"] in self.must_include[obj_type]:
continue
if not rdata["uid"] in locals:
creator = getattr(self.api, "new_%s" % obj_type)
newobj = creator()
newobj.from_dict(rdata)
try:
self.logger.info("adding %s %s" % (obj_type, rdata["name"]))
if not self.api.add_item(obj_type, newobj, logger=self.logger):
self.logger.error("failed to add %s %s" % (obj_type, rdata["name"]))
self.remote_names = {}
self.remote_dict = {}
self.must_include = {
"distro": {},
"profile": {},
"system": {},
"image": {},
"repo": {},
"mgmtclass": {},
"package": {},
"file": {}
}
for ot in OBJ_TYPES:
self.remote_names[ot] = list(utils.lod_to_dod(self.remote_data[ot], "name").keys())
self.remote_dict[ot] = utils.lod_to_dod(self.remote_data[ot], "name")
if self.sync_all:
for names in self.remote_dict[ot]:
self.must_include[ot][names] = 1
self.logger.debug("remote names struct is %s" % self.remote_names)
if not self.sync_all:
# include all profiles that are matched by a pattern
for obj_type in OBJ_TYPES:
patvar = getattr(self, "%s_patterns" % obj_type)
self.logger.debug("* Finding Explicit %s Matches" % obj_type)
for pat in patvar:
for remote in self.remote_names[obj_type]:
self.logger.debug("?: seeing if %s looks like %s" % (remote, pat))
if fnmatch.fnmatch(remote, pat):
self.logger.debug("Adding %s for pattern match %s." % (remote, pat))
def remove_objects_not_on_master(self, obj_type):
locals = utils.lod_to_dod(self.local_data[obj_type], "uid")
remotes = utils.lod_to_dod(self.remote_data[obj_type], "uid")
for (luid, ldata) in list(locals.items()):
if luid not in remotes:
try:
self.logger.info("removing %s %s" % (obj_type, ldata["name"]))
self.api.remove_item(obj_type, ldata["name"], recursive=True, logger=self.logger)
except Exception:
utils.log_exc(self.logger)
def replace_objects_newer_on_remote(self, obj_type):
locals = utils.lod_to_dod(self.local_data[obj_type], "uid")
remotes = utils.lod_to_dod(self.remote_data[obj_type], "uid")
for (ruid, rdata) in list(remotes.items()):
# do not add the system if it is not on the transfer list
if not rdata["name"] in self.must_include[obj_type]:
continue
if ruid in locals:
ldata = locals[ruid]
if ldata["mtime"] < rdata["mtime"]:
if ldata["name"] != rdata["name"]:
self.logger.info("removing %s %s" % (obj_type, ldata["name"]))
self.api.remove_item(obj_type, ldata["name"], recursive=True, logger=self.logger)
creator = getattr(self.api, "new_%s" % obj_type)
newobj = creator()
newobj.from_dict(rdata)