Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _raise_adal_error(ex):
"""Adal error - usually wrong tenant ID."""
if ex.args[0] == "Unexpected polling state code_expired":
raise MsticpyKqlConnectionError(
"Authentication request was not completed.",
title="authentication timed out",
)
ex_mssgs = ex.error_response["error_description"].split("\r\n")
raise MsticpyKqlConnectionError(
*ex_mssgs, title="could not authenticate to tenant"
)
def _raise_adal_error(ex):
"""Adal error - usually wrong tenant ID."""
if ex.args[0] == "Unexpected polling state code_expired":
raise MsticpyKqlConnectionError(
"Authentication request was not completed.",
title="authentication timed out",
)
ex_mssgs = ex.error_response["error_description"].split("\r\n")
raise MsticpyKqlConnectionError(
*ex_mssgs, title="could not authenticate to tenant"
)
def _raise_kql_engine_error(ex):
ex_mssgs = [
"An error was returned from Kqlmagic KqlEngine.",
"This can occur if you tried to connect to a second workspace using a"
+ " different tenant ID - only a single tenant ID is supported in"
+ " one notebook.",
"Other causes of this error could be an invalid format of your"
+ " connection string",
*(ex.args),
]
raise MsticpyKqlConnectionError(*ex_mssgs, title="kql connection error")
def _raise_kql_error(self, ex):
kql_err = json.loads(ex.args[0]).get("error")
if kql_err.get("code") == "WorkspaceNotFoundError":
ex_mssgs = [
"The workspace ID used to connect to Azure Sentinel could not be found.",
"Please check that this is a valid workspace for your subscription",
]
ws_match = re.search(self._WS_RGX, self.current_connection, re.IGNORECASE)
if ws_match:
ws_name = ws_match.groupdict().get("ws")
ex_mssgs.append(f"The workspace id used was {ws_name}.")
ex_mssgs.append(f"The full connection string was {self.current_connection}")
raise MsticpyKqlConnectionError(*ex_mssgs, title="unknown workspace")
raise MsticpyKqlConnectionError(
"The service returned the following error when connecting",
str(ex),
title="Kql response error",
)
def _raise_kql_error(self, ex):
kql_err = json.loads(ex.args[0]).get("error")
if kql_err.get("code") == "WorkspaceNotFoundError":
ex_mssgs = [
"The workspace ID used to connect to Azure Sentinel could not be found.",
"Please check that this is a valid workspace for your subscription",
]
ws_match = re.search(self._WS_RGX, self.current_connection, re.IGNORECASE)
if ws_match:
ws_name = ws_match.groupdict().get("ws")
ex_mssgs.append(f"The workspace id used was {ws_name}.")
ex_mssgs.append(f"The full connection string was {self.current_connection}")
raise MsticpyKqlConnectionError(*ex_mssgs, title="unknown workspace")
raise MsticpyKqlConnectionError(
"The service returned the following error when connecting",
str(ex),
title="Kql response error",
)
def _raise_authn_error(ex):
"""Raise an authentication error."""
ex_mssgs = [
"The authentication failed.",
"Please check the credentials you are using and permissions on the workspace",
*(ex.args),
]
raise MsticpyKqlConnectionError(*ex_mssgs, title="authentication failed")