Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
MsticpyUserError,
)
BASE_EX_CASES: List[Any] = [
MsticpyException,
MsticpyConfigException,
MsticpyResourceException,
]
USER_EX_CASES: List[Any] = [
MsticpyAzureConfigError,
MsticpyKeyVaultConfigError,
MsticpyKeyVaultMissingSecretError,
MsticpyNoDataSourceError,
MsticpyNotConnectedError,
MsticpyUserConfigError,
MsticpyUserError,
]
_TEST_ARG = "test arg"
_TEST_URI = "https://msticpy.readthedocs.org/test"
_OTHER_URI = "https://msticpy.readthedocs.org/test2"
_TEST_TITLE = "test error"
_TEST_EX_CASES: List[Tuple] = []
for case in USER_EX_CASES:
tst_kwargs = dict(help_uri=_TEST_URI, title=_TEST_TITLE, other_uri=_OTHER_URI)
_TEST_EX_CASES.append((case, [_TEST_ARG], tst_kwargs))
# pylint: disable=protected-access
def _create_and_capture_exception(ex_cls, *args, **kwargs):
def test_splunk_connect_no_params(splunk_client):
"""Check failure with no args."""
splunk_client.connect = cli_connect
sp_driver = SplunkDriver()
check.is_true(sp_driver.loaded)
with pytest.raises(MsticpyUserConfigError) as mp_ex:
sp_driver.connect()
check.is_false(sp_driver.connected)
check.is_in("no Splunk connection parameters", mp_ex.value.args)
provider_class: TIProvider = getattr(
sys.modules[__name__], provider_name, None
)
if not provider_class:
warnings.warn(
f"Could not find provider class for {provider_name} "
+ f"in config section {provider_entry}"
)
continue
# instantiate class sending args from settings to init
try:
provider_instance = provider_class(**(settings.args))
except MsticpyConfigException as mp_ex:
# If the TI Provider didn't load, raise an exception
raise MsticpyUserConfigError(
f"Could not load TI Provider {provider_name}",
*mp_ex.args,
"To avoid loading this provider please use the 'providers' parameter"
+ " to TILookup() to specify which providers to load.",
title="TIProvider configuration error",
help_uri="https://msticpy.readthedocs.io/data_acquisition/"
+ "TIProviders.html#configuration-file",
)
# set the description from settings, if one is provided, otherwise
# use class docstring.
provider_instance.description = (
settings.description
if settings.description
else provider_instance.__doc__
)
"Or ensure that a copy of this file is in the current directory.",
]
if args:
add_args = [*args, *mp_loc_mssg]
else:
add_args = [def_mssg, *mp_loc_mssg]
if help_uri:
uri: Union[Tuple[str, str], str] = help_uri
add_uris = {"basehelp_uri": self.DEF_HELP_URI}
else:
uri = self.DEF_HELP_URI
add_uris = {}
super().__init__(*add_args, help_uri=uri, **add_uris, **kwargs)
class MsticpyKeyVaultConfigError(MsticpyUserConfigError):
"""Key Vault configuration exception."""
DEF_HELP_URI = (
"Using keyvault to store msticpy secrets",
"https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html"
+ "#specifying-secrets-as-key-vault-secrets",
)
def __init__(
self, *args, help_uri: Union[Tuple[str, str], str, None] = None, **kwargs
):
"""
Create Key Vault configuration exception.
Parameters
----------
Parameters
----------
help_uri : Union[Tuple[str, str], str, None], optional
Override the default help URI.
"""
mssg = (
"Please verfiy that the item using this secret is properly"
+ " configured in in your msticpyconfig.yaml."
)
add_args = [*args, mssg]
uri = help_uri or self.DEF_HELP_URI
super().__init__(*add_args, help_uri=uri, **kwargs)
class MsticpyAzureConfigError(MsticpyUserConfigError):
"""Exception class for AzureData."""
DEF_HELP_URI = (
"Using the Azure API connector",
"https://msticpy.readthedocs.io/en/latest/data_acquisition/AzureData.html"
+ "#instantiating-and-connecting-with-an-azure-data-connector",
)
def __init__(
self, *args, help_uri: Union[Tuple[str, str], str, None] = None, **kwargs
):
"""
Create Azure data missing configuration exception.
Parameters
----------
def _search_for_file(self, pattern: str) -> str:
config_file = None
searched_configs = list(Path("..").glob(pattern))
if not searched_configs:
raise MsticpyUserConfigError(
*_NO_CONFIG_ERR, title="Workspace configuration missing."
)
for found_file in searched_configs:
test_content = self._read_config_values(str(found_file))
if "workspace_id" in test_content:
config_file = found_file
break
if config_file is None:
raise MsticpyUserConfigError(
*_NO_CONFIG_ERR, title="Workspace configuration missing."
)
# Warn that we're using a "found" file
warnings.warn("\n".join(_NO_CONFIG_WARN).format(config_file=config_file))
return str(config_file)
if api_key:
self._api_key = api_key
else:
self._api_key = self.settings.args.get("AuthKey") # type: ignore
self._dbfolder = db_folder
if self._dbfolder is None:
self._dbfolder = self.settings.args.get("DBFolder", self._DB_HOME)
self._dbfolder = str(Path(self._dbfolder).expanduser()) # type: ignore
self._force_update = force_update
self._auto_update = auto_update
self._check_and_update_db(self._dbfolder, self._force_update, self._auto_update)
self._dbpath = self._get_geoip_dbpath(self._dbfolder)
if not self._dbpath:
raise MsticpyUserConfigError(
"No usable GeoIP Database could be found.",
"Check that you have correctly configured the Maxmind API key.",
(
"If you are using a custom DBFolder setting in your config, "
+ "check that this is a valid path."
),
help_uri=(
"https://msticpy.readthedocs.io/data_acquisition/"
+ "GeoIPLookups.html#maxmind-geo-ip-lite-lookup-class"
),
service_uri="https://www.maxmind.com/en/geolite2/signup",
title="Maxmind GeoIP database not found",
)
self._reader = geoip2.database.Reader(self._dbpath)