Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
warnings.simplefilter("ignore", category=UserWarning)
syslog_file = os.path.join(_TEST_DATA, "syslog_data.csv")
syslog_df = pd.read_csv(syslog_file, parse_dates=["TimeGenerated"])
heartbeat_file = os.path.join(_TEST_DATA, "host_hb.csv")
heartbeat_df = pd.read_csv(heartbeat_file)
az_net_file = os.path.join(_TEST_DATA, "az_net.csv")
az_net_df = pd.read_csv(az_net_file)
try:
host_record = ls.create_host_record(syslog_df, heartbeat_df, az_net_df)
assert type(host_record) == Host # nosec
assert host_record.OSType == "Linux" # nosec
except GeoIPDatabaseException:
# test will fail if no GeoIP database exists or can be downloaded
other_provider_settings = get_provider_settings(
config_section="OtherProviders"
).get("GeoIPLite", {})
geolite_key = None
if other_provider_settings:
geolite_key = other_provider_settings.args.get("AuthKey")
if not geolite_key:
warnings.resetwarnings()
warnings.warn(
message=(
"No configuration value found for GeoLite key. ",
+"Test test_host_data skipped.",
)
)
return
assert False
def _check_provider_settings(self, sec_settings):
prov_settings = get_provider_settings()
for p_name, p_settings in prov_settings.items():
args = prov_settings[p_name].args
if p_name == "OTX":
sec_value = sec_settings.read_secret(args["AuthKey"])
self.assertEqual(KV_SECRETS["OTX-AuthKey"], sec_value)
elif p_name == "VirusTotal":
sec_value = sec_settings.read_secret(args["AuthKey"])
self.assertEqual(
KV_SECRETS["TIProviders-VirusTotal-Args-AuthKey"], sec_value
)
elif p_name == "XForce":
sec_value = sec_settings.read_secret(args["AuthKey"])
self.assertEqual(KV_SECRETS["XForce-AuthKey"], sec_value)
sec_value = sec_settings.read_secret(args["ApiID"])
self.assertEqual(KV_SECRETS["XForce-ApiID"], sec_value)
def _get_geoip_provider_settings(provider_name: str) -> ProviderSettings:
"""
Return settings for a provider.
Parameters
----------
provider_name : str
Name of the provider.
Returns
-------
ProviderSettings
Settings for the provider.
"""
settings = get_provider_settings(config_section="OtherProviders")
if provider_name in settings:
return settings[provider_name]
return ProviderSettings(name=provider_name, description="Not found.")
def _get_config_settings() -> Dict[Any, Any]:
"""Get config from msticpyconfig."""
data_provs = get_provider_settings(config_section="DataProviders")
splunk_settings: Optional[ProviderSettings] = data_provs.get("Splunk")
return getattr(splunk_settings, "args", {})
def connect(self, client_id: str = None, tenant_id: str = None, secret: str = None):
"""Authenticate with the SDK."""
# Use details of msticpyyaml if not provided
if client_id is None and tenant_id is None and secret is None:
data_provs = get_provider_settings(config_section="DataProviders")
az_cli_config = data_provs.get("AzureCLI")
# az_cli_config = config.settings.get("AzureCLI")
if not az_cli_config:
raise MsticpyAzureException(
"No AzureCLI configuration found in configuration settings."
)
config_items = az_cli_config.args
client_id = config_items["clientId"]
tenant_id = config_items["tenantId"]
secret = config_items["clientSecret"]
# Create credentials and connect to the subscription client to validate
self.credentials = ServicePrincipalCredentials(
client_id=client_id, secret=secret, tenant=tenant_id
)
if not self.credentials:
def _load_providers(self):
"""Load provider classes based on config."""
prov_settings = get_provider_settings()
for provider_entry, settings in prov_settings.items():
# Allow overriding provider name to use another class
provider_name = settings.provider if settings.provider else provider_entry
if self._providers_to_load and provider_name not in self._providers_to_load:
continue
provider_class: TIProvider = getattr(
sys.modules[__name__], provider_name, None
)
if not provider_class:
warnings.warn(
f"Could not find provider class for {provider_name} "
+ f"in config section {provider_entry}"
)
continue