Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_expected_info_response(settings, tabpy_state):
return {
"description": tabpy_state.get_description(),
"creation_time": tabpy_state.creation_time,
"state_path": settings["state_file_path"],
"server_version": settings[SettingsParameters.ServerVersion],
"name": tabpy_state.name,
"versions": settings["versions"],
}
set_parameter(
SettingsParameters.EvaluateTimeout,
ConfigParameters.TABPY_EVALUATE_TIMEOUT,
default_val=30,
)
try:
self.settings[SettingsParameters.EvaluateTimeout] = float(
self.settings[SettingsParameters.EvaluateTimeout]
)
except ValueError:
logger.warning(
"Evaluate timeout must be a float type. Defaulting "
"to evaluate timeout of 30 seconds."
)
self.settings[SettingsParameters.EvaluateTimeout] = 30
pkg_path = os.path.dirname(tabpy.__file__)
set_parameter(
SettingsParameters.UploadDir,
ConfigParameters.TABPY_QUERY_OBJECT_PATH,
default_val=os.path.join(pkg_path, "tmp", "query_objects"),
)
if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
os.makedirs(self.settings[SettingsParameters.UploadDir])
# set and validate transfer protocol
set_parameter(
SettingsParameters.TransferProtocol,
ConfigParameters.TABPY_TRANSFER_PROTOCOL,
default_val="http",
)
raise RuntimeError(msg)
protocol = self.settings[SettingsParameters.TransferProtocol]
if protocol == "http":
return
if protocol != "https":
msg = f"Unsupported transfer protocol: {protocol}"
logger.critical(msg)
raise RuntimeError(msg)
self._validate_cert_key_state(
"The parameter(s) {} must be set.",
SettingsParameters.CertificateFile in self.settings,
SettingsParameters.KeyFile in self.settings,
)
cert = self.settings[SettingsParameters.CertificateFile]
self._validate_cert_key_state(
"The parameter(s) {} must point to " "an existing file.",
os.path.isfile(cert),
os.path.isfile(self.settings[SettingsParameters.KeyFile]),
)
tabpy.tabpy_server.app.util.validate_cert(cert)
)
shutil.copy(state_file_template_path, state_file_path)
logger.info(f"Loading state from state file {state_file_path}")
tabpy_state = _get_state_from_file(state_file_dir)
self.tabpy_state = TabPyState(config=tabpy_state, settings=self.settings)
self.python_service = PythonServiceHandler(PythonService())
self.settings["compress_response"] = True
set_parameter(
SettingsParameters.StaticPath,
ConfigParameters.TABPY_STATIC_PATH,
default_val="./",
)
self.settings[SettingsParameters.StaticPath] = os.path.abspath(
self.settings[SettingsParameters.StaticPath]
)
logger.debug(
f"Static pages folder set to "
f'"{self.settings[SettingsParameters.StaticPath]}"'
)
# Set subdirectory from config if applicable
if tabpy_state.has_option("Service Info", "Subdirectory"):
self.subdirectory = "/" + tabpy_state.get("Service Info", "Subdirectory")
# If passwords file specified load credentials
set_parameter(ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE)
if ConfigParameters.TABPY_PWD_FILE in self.settings:
if not self._parse_pwd_file():
msg = (
"Failed to read passwords file "
].lower()
set_parameter(
SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE
)
set_parameter(SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE)
self._validate_transfer_protocol_settings()
# if state.ini does not exist try and create it - remove
# last dependence on batch/shell script
set_parameter(
SettingsParameters.StateFilePath,
ConfigParameters.TABPY_STATE_PATH,
default_val=os.path.join(pkg_path, "tabpy_server"),
)
self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
os.path.normpath(
os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
)
)
state_file_dir = self.settings[SettingsParameters.StateFilePath]
state_file_path = os.path.join(state_file_dir, "state.ini")
if not os.path.isfile(state_file_path):
state_file_template_path = os.path.join(
pkg_path, "tabpy_server", "state.ini.template"
)
logger.debug(
f"File {state_file_path} not found, creating from "
f"template {state_file_template_path}..."
)
shutil.copy(state_file_template_path, state_file_path)
def initialize(self, app):
self.tabpy_state = app.tabpy_state
# set content type to application/json
self.set_header("Content-Type", "application/json")
self.protocol = self.settings[SettingsParameters.TransferProtocol]
self.port = self.settings[SettingsParameters.Port]
self.python_service = app.python_service
self.credentials = app.credentials
self.username = None
self.password = None
self.eval_timeout = self.settings[SettingsParameters.EvaluateTimeout]
self.logger = ContextLoggerWrapper(self.request)
self.logger.enable_context_logging(
app.settings[SettingsParameters.LogRequestContext]
)
self.logger.log(logging.DEBUG, "Checking if need to handle authentication")
self.not_authorized = not self.handle_authentication("v1")
def on_state_change(
settings, tabpy_state, python_service, logger=logging.getLogger(__name__)
):
try:
logger.log(logging.INFO, "Loading state from state file")
config = util._get_state_from_file(
settings[SettingsParameters.StateFilePath], logger=logger
)
new_ps_state = TabPyState(config=config, settings=settings)
(has_changes, changes) = _get_latest_service_state(
settings, tabpy_state, new_ps_state, python_service
)
if not has_changes:
logger.info("Nothing changed, return.")
return
new_endpoints = new_ps_state.get_endpoints()
for object_name in changes["endpoints"]:
(object_type, object_version, object_path) = changes["endpoints"][
object_name
]
def initialize(self, app):
self.tabpy_state = app.tabpy_state
# set content type to application/json
self.set_header("Content-Type", "application/json")
self.protocol = self.settings[SettingsParameters.TransferProtocol]
self.port = self.settings[SettingsParameters.Port]
self.python_service = app.python_service
self.credentials = app.credentials
self.username = None
self.password = None
self.eval_timeout = self.settings[SettingsParameters.EvaluateTimeout]
self.logger = ContextLoggerWrapper(self.request)
self.logger.enable_context_logging(
app.settings[SettingsParameters.LogRequestContext]
)
self.logger.log(logging.DEBUG, "Checking if need to handle authentication")
self.not_authorized = not self.handle_authentication("v1")