Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_app(self):
self.app = TabPyApp()
return self.app._create_tornado_web_app()
mock_path_exists,
mock_psws,
mock_management_util,
mock_tabpy_state,
mock_parse_arguments,
):
pkg_path = os.path.dirname(tabpy.__file__)
obj_path = os.path.join(pkg_path, "tmp", "query_objects")
state_path = os.path.join(pkg_path, "tabpy_server")
mock_os.environ = {
"TABPY_PORT": "9004",
"TABPY_QUERY_OBJECT_PATH": obj_path,
"TABPY_STATE_PATH": state_path,
}
TabPyApp(None)
self.assertEqual(len(mock_psws.mock_calls), 1)
self.assertEqual(len(mock_tabpy_state.mock_calls), 1)
self.assertEqual(len(mock_path_exists.mock_calls), 1)
self.assertTrue(len(mock_management_util.mock_calls) > 0)
mock_os.makedirs.assert_not_called()
def test_given_multiple_credentials_expect_all_parsed(self):
self._set_file(
self.config_file.name, "[TabPy]\n" f"TABPY_PWD_FILE = {self.pwd_file.name}"
)
creds = {"user_1": "pwd_1", "user@2": "pwd@2", "user#3": "pwd#3"}
pwd_file_context = ""
for login in creds:
pwd_file_context += f"{login} {creds[login]}\n"
self._set_file(self.pwd_file.name, pwd_file_context)
app = TabPyApp(self.config_file.name)
self.assertCountEqual(creds, app.credentials)
for login in creds:
self.assertIn(login, app.credentials)
self.assertEqual(creds[login], app.credentials[login])
def test_http(self):
self.fp.write("[TabPy]\n" "TABPY_TRANSFER_PROTOCOL = http")
self.fp.close()
app = TabPyApp(self.fp.name)
self.assertEqual(app.settings["transfer_protocol"], "http")
def assertTabPyAppRaisesRuntimeError(self, expected_message):
with self.assertRaises(RuntimeError) as err:
TabPyApp(self.fp.name)
self.assertEqual(err.exception.args[0], expected_message)
def test_given_no_pwd_file_expect_empty_credentials_list(self):
self._set_file(
self.config_file.name, "[TabPy]\n" "TABPY_TRANSFER_PROTOCOL = http"
)
app = TabPyApp(self.config_file.name)
self.assertDictEqual(
app.credentials,
{},
"Expected no credentials with no password file provided",
)
def test_given_duplicate_usernames_expect_parsing_fails(self):
self._set_file(
self.config_file.name, "[TabPy]\n" f"TABPY_PWD_FILE = {self.pwd_file.name}"
)
login = "user_name_123"
pwd = "hashedpw"
self._set_file(
self.pwd_file.name, "# passwords\n" "\n" f"{login} {pwd}\n{login} {pwd}"
)
with self.assertRaises(RuntimeError) as cm:
TabPyApp(self.config_file.name)
ex = cm.exception
self.assertEqual(
f"Failed to read password file {self.pwd_file.name}", ex.args[0]
)
def test_given_missing_pwd_file_expect_app_fails(self):
self._set_file(self.config_file.name, "[TabPy]\n" "TABPY_PWD_FILE = foo")
with self.assertRaises(RuntimeError) as cm:
TabPyApp(self.config_file.name)
ex = cm.exception
self.assertEqual(
f"Failed to read password file {self.pwd_file.name}", ex.args[0]
)
def test_custom_evaluate_timeout_invalid(
self, mock_state, mock_get_state_from_file, mock_path_exists
):
self.assertTrue(self.config_file is not None)
config_file = self.config_file
config_file.write(
"[TabPy]\n" 'TABPY_EVALUATE_TIMEOUT = "im not a float"'.encode()
)
config_file.close()
app = TabPyApp(self.config_file.name)
self.assertEqual(app.settings["evaluate_timeout"], 30.0)
def main():
from tabpy.tabpy_server.app.app import TabPyApp
app = TabPyApp()
app.run()