Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init_ps_server(settings, tabpy_state):
logger.info("Initializing TabPy Server...")
existing_pos = tabpy_state.get_endpoints()
for (object_name, obj_info) in existing_pos.items():
try:
object_version = obj_info["version"]
get_query_object_path(
settings[SettingsParameters.StateFilePath], object_name, object_version
)
except Exception as e:
logger.error(
f"Exception encounted when downloading object: {object_name}"
f", error: {e}"
if "docstring" in request_data:
docstring = str(
bytes(request_data["docstring"], "utf-8").decode("unicode_escape")
)
else:
docstring = None
endpoint_type = request_data["type"] if "type" in request_data else None
methods = request_data["methods"] if "methods" in request_data else []
dependencies = (
request_data["dependencies"] if "dependencies" in request_data else None
)
target = request_data["target"] if "target" in request_data else None
schema = request_data["schema"] if "schema" in request_data else None
src_path = request_data["src_path"] if "src_path" in request_data else None
target_path = get_query_object_path(
self.settings[SettingsParameters.StateFilePath], name, version
)
self.logger.log(logging.DEBUG, f"Checking source path {src_path}...")
_path_checker = _compile(r"^[\\\:a-zA-Z0-9-_~\s/\.\(\)]+$")
# copy from staging
if src_path:
if not isinstance(request_data["src_path"], str):
raise gen.Return("src_path must be a string.")
if not _path_checker.match(src_path):
raise gen.Return(
"Endpoint source path name can only contain: "
"a-z, A-Z, 0-9, underscore, hyphens and spaces."
)
yield self._copy_po_future(src_path, target_path)
elif endpoint_type != "alias":
def init_model_evaluator(settings, tabpy_state, python_service):
"""
This will go through all models that the service currently have and
initialize them.
"""
logger.info("Initializing models...")
existing_pos = tabpy_state.get_endpoints()
for (object_name, obj_info) in existing_pos.items():
object_version = obj_info["version"]
object_type = obj_info["type"]
object_path = get_query_object_path(
settings[SettingsParameters.StateFilePath], object_name, object_version
)
logger.info(
f"Load endpoint: {object_name}, "
f"version: {object_version}, "
f"type: {object_type}"
)
if object_type == "alias":
msg = LoadObject(
object_name, obj_info["target"], object_version, False, "alias"
)
else:
local_path = object_path
msg = LoadObject(
object_name, local_path, object_version, False, object_type
if len(endpoints) == 0:
self.error_out(404, f"endpoint {name} does not exist.")
self.finish()
return
# update state
try:
endpoint_info = self.tabpy_state.delete_endpoint(name)
except Exception as e:
self.error_out(400, f"Error when removing endpoint: {e.message}")
self.finish()
return
# delete files
if endpoint_info["type"] != "alias":
delete_path = get_query_object_path(
self.settings["state_file_path"], name, None
)
try:
yield self._delete_po_future(delete_path)
except Exception as e:
self.error_out(400, f"Error while deleting: {e}")
self.finish()
return
self.set_status(204)
self.finish()
except Exception as e:
err_msg = format_exception(e, "delete endpoint")
self.error_out(500, err_msg)
self.finish()
endpoint_diff: Summary of what has changed, one entry for each changes
"""
# Shortcut when nothing is changed
changes = {"endpoints": {}}
# update endpoints
new_endpoints = new_ps_state.get_endpoints()
diff = {}
current_endpoints = python_service.ps.query_objects
for (endpoint_name, endpoint_info) in new_endpoints.items():
existing_endpoint = current_endpoints.get(endpoint_name)
if (existing_endpoint is None) or endpoint_info["version"] != existing_endpoint[
"version"
]:
# Either a new endpoint or new endpoint version
path_to_new_version = get_query_object_path(
settings[SettingsParameters.StateFilePath],
endpoint_name,
endpoint_info["version"],
)
endpoint_type = endpoint_info.get("type", "model")
diff[endpoint_name] = (
endpoint_type,
endpoint_info["version"],
path_to_new_version,
)
# add removed models too
for (endpoint_name, endpoint_info) in current_endpoints.items():
if endpoint_name not in new_endpoints.keys():
endpoint_type = current_endpoints[endpoint_name].get("type", "model")
diff[endpoint_name] = (endpoint_type, None, None)