Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# packages to request in the build request
self.response_json["packages"] = self.database.transform_packages(
self.request["distro"],
self.outdated_version,
self.request["version"],
" ".join(self.request_json["installed"].keys()),
)
self.response_status = HTTPStatus.OK # 200
else:
self.response_json["packages"] = list(self.request_json["installed"].keys())
self.response_status = HTTPStatus.NO_CONTENT # 204
manifest_content = ""
for package, version in sorted(self.request_json["installed"].items()):
manifest_content += "{} - {}\n".format(package, version)
self.request["manifest_hash"] = get_hash(manifest_content, 15)
self.request["manifest"] = self.request_json["installed"]
if "version" in self.response_json or "upgrade_packages" in self.request_json:
# TODO this result in double jsonifying
# problem is postgres gives back perfect json while the rest of the
# json response is a dict, until it's decoded in the end
self.response_json["upgrades"] = json.loads(
self.database.get_manifest_upgrades(self.request)
)
if self.response_json["upgrades"] != {}:
self.response_status = HTTPStatus.OK # 200
# finally respond
return self.respond()
def set_packages_hash(self):
# sort and deduplicate requested packages
if "packages" in self.params:
self.params["packages"] = sorted(list(set(self.params["packages"])))
self.params["packages_hash"] = get_hash(
" ".join(self.params["packages"]), 12
)
else:
self.params["packages"] = ""
self.params["packages_hash"] = ""
else:
self.log.error("couldn't determine manifest")
print(manifest_content)
print(errors)
self.write_log(fail_log_path, stderr=errors)
self.database.set_requests_status(
self.params["request_hash"], "manifest_fail"
)
return False
# set directory where image is stored on server
self.image.set_image_dir()
self.log.debug("dir %s", self.image.params["dir"])
# calculate hash based on resulted manifest
self.image.params["image_hash"] = get_hash(
" ".join(self.image.as_array("manifest_hash")), 15
)
# set log path in case of success
success_log_path = self.image.params["dir"] + "/buildlog-{}.txt".format(
self.params["image_hash"]
)
# set build_status ahead, if stuff goes wrong it will be changed
self.build_status = "created"
# check if image already exists
if not self.image.created() or not self.database.image_exists(
self.params["image_hash"]
):
self.log.info("build image")
def __init__(self, params):
self.config = Config()
self.log = logging.getLogger(__name__)
self.log.info("config initialized")
self.database = Database(self.config)
self.log.info("database initialized")
self.params = params
if "defaults_hash" not in self.params:
self.params["defaults_hash"] = ""
if "defaults" in self.params:
if self.params["defaults"] != "":
self.params["defaults_hash"] = get_hash(self.params["defaults"], 32)
if not self.params["defaults_hash"]:
self.params["defaults_hash"] = ""
# validate attached defaults
if "defaults" in self.request_json:
if self.request_json["defaults"]:
# check if the uci file exceeds the max file size. this should
# be done as the uci-defaults are at least temporary stored in
# the database to be passed to a worker
if getsizeof(self.request_json["defaults"]) > self.config.get(
"max_defaults_size", 1024
):
self.response_json["error"] = "attached defaults exceed max size"
self.response_status = (
420
) # this error code is the best I could find
self.respond()
else:
self.request["defaults_hash"] = get_hash(
self.request_json["defaults"], 32
)
self.database.insert_defaults(
self.request["defaults_hash"], self.request_json["defaults"]
)
# add package_hash to database
if "packages" in self.request_json:
# check for existing packages
bad_packages = self.check_bad_packages(self.request_json["packages"])
if bad_packages:
return bad_packages
self.request["packages_hash"] = get_packages_hash(
self.request_json["packages"]
)
self.database.insert_packages_hash(
self.database.get_packages_hash(self.params["packages_hash"])
)
self.log.debug("packages_requested %s", packages_requested)
packages_remove = packages_image - packages_requested
self.log.debug("packages_remove %s", packages_remove)
packages_requested.update(set(map(lambda x: "-" + x, packages_remove)))
self.params["packages"] = " ".join(packages_requested)
self.log.debug("packages param %s", self.params["packages"])
else:
self.log.debug("build package with default packages")
# first determine the resulting manifest hash
return_code, manifest_content, errors = self.run_meta("manifest")
if return_code == 0:
self.image.params["manifest_hash"] = get_hash(manifest_content, 15)
manifest_pattern = r"(.+) - (.+)\n"
manifest_packages = re.findall(manifest_pattern, manifest_content)
self.database.add_manifest_packages(
self.image.params["manifest_hash"], manifest_packages
)
self.log.info("successfully parsed manifest")
else:
self.log.error("couldn't determine manifest")
print(manifest_content)
print(errors)
self.write_log(fail_log_path, stderr=errors)
self.database.set_requests_status(
self.params["request_hash"], "manifest_fail"
)
return False