Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
object_path = get_query_object_path(
settings[SettingsParameters.StateFilePath], object_name, object_version
)
logger.info(
f"Load endpoint: {object_name}, "
f"version: {object_version}, "
f"type: {object_type}"
)
if object_type == "alias":
msg = LoadObject(
object_name, obj_info["target"], object_version, False, "alias"
)
else:
local_path = object_path
msg = LoadObject(
object_name, local_path, object_version, False, object_type
)
python_service.manage_request(msg)
existing_pos = tabpy_state.get_endpoints()
for (object_name, obj_info) in existing_pos.items():
object_version = obj_info["version"]
object_type = obj_info["type"]
object_path = get_query_object_path(
settings[SettingsParameters.StateFilePath], object_name, object_version
)
logger.info(
f"Load endpoint: {object_name}, "
f"version: {object_version}, "
f"type: {object_type}"
)
if object_type == "alias":
msg = LoadObject(
object_name, obj_info["target"], object_version, False, "alias"
)
else:
local_path = object_path
msg = LoadObject(
object_name, local_path, object_version, False, object_type
)
python_service.manage_request(msg)
)
else:
endpoint_info = new_endpoints[object_name]
is_update = object_version > 1
if object_type == "alias":
msg = LoadObject(
object_name,
endpoint_info["target"],
object_version,
is_update,
"alias",
)
else:
local_path = object_path
msg = LoadObject(
object_name, local_path, object_version, is_update, object_type
)
python_service.manage_request(msg)
wait_for_endpoint_loaded(python_service, object_name)
# cleanup old version of endpoint files
if object_version > 2:
cleanup_endpoint_files(
object_name,
settings[SettingsParameters.UploadDir],
logger=logger,
retain_versions=[object_version, object_version - 1],
)
except Exception as e:
def manage_request(self, msg):
try:
logger.debug(f"Received request {type(msg).__name__}")
if isinstance(msg, LoadObject):
response = self.ps.load_object(*msg)
elif isinstance(msg, DeleteObjects):
response = self.ps.delete_objects(msg.uris)
elif isinstance(msg, FlushObjects):
response = self.ps.flush_objects()
elif isinstance(msg, CountObjects):
response = self.ps.count_objects()
elif isinstance(msg, ListObjects):
response = self.ps.list_objects()
else:
response = UnknownMessage(msg)
logger.debug(f"Returning response {response}")
return response
except Exception as e:
logger.exception(e)
]
if not object_path and not object_version: # removal
logger.info(f"Removing object: URI={object_name}")
python_service.manage_request(DeleteObjects([object_name]))
cleanup_endpoint_files(
object_name, settings[SettingsParameters.UploadDir], logger=logger
)
else:
endpoint_info = new_endpoints[object_name]
is_update = object_version > 1
if object_type == "alias":
msg = LoadObject(
object_name,
endpoint_info["target"],
object_version,
is_update,
"alias",
)
else:
local_path = object_path
msg = LoadObject(
object_name, local_path, object_version, is_update, object_type
)
python_service.manage_request(msg)
wait_for_endpoint_loaded(python_service, object_name)
# cleanup old version of endpoint files