Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
resource_group=kv_settings.resourcegroup,
azure_region=kv_settings.azureregion,
)
vault_mgmt.create_vault("mynewvault")
vault_mgmt.create_vault("myothervault")
self.assertIn("mynewvault", vault_mgmt.list_vaults())
self.assertIn("myothervault", vault_mgmt.list_vaults())
self.assertEqual(
vault_mgmt.get_vault_uri("mynewvault"), "https://mynewvault.vault.azure.net"
)
kv_settings = get_kv_settings("msticpyconfig-kv.yaml")
kv_settings["azureregion"] = None
with self.assertRaises(MPKeyVaultConfigException):
nr_vault_mgmt = BHKeyVaultMgmtClient(
tenant_id=kv_settings.tenantid,
subscription_id=kv_settings.subscriptionid,
resource_group=kv_settings.resourcegroup,
settings=kv_settings,
)
nr_vault_mgmt.create_vault("mynewvault")
if "KeyVault" in setting_item:
kv_val = setting_item.get("KeyVault")
def_vault_name = self._kv_settings.get("VaultName")
if not kv_val or kv_val.casefold() == "default":
# If no value, get the default VaultName from settings
# and use the setting path as the secret name
if not def_vault_name:
raise ValueError("No VaultName defined in KeyVault settings.")
secret_name = self.format_kv_name(setting_path)
return def_vault_name, secret_name
if "/" in kv_val:
# '/' delimited string means VaultName/Secret
vault_name, secret_name = kv_val.split("/")
return vault_name, self.format_kv_name(secret_name)
if not def_vault_name:
raise MPKeyVaultConfigException(
f"No VaultName defined in KeyVault settings for {setting_path}."
)
# If there is a single string - take that as the secret name
return def_vault_name, self.format_kv_name(kv_val)
return None, None
Returns
-------
str
Tenant Authority
Raises
------
KeyVaultConfigException
If tenant is not defined.
"""
auth = authority_uri or self.authority_uri.strip()
if not tenant:
tenant = self.get("tenantid")
if not tenant:
raise MPKeyVaultConfigException(
"Could not get tenant ID from params or config."
)
if auth.endswith("/"):
return auth + tenant.strip()
return auth + "/" + tenant.strip()
Returns
-------
Vault
The Vault object.
"""
if not self.azure_region:
raise MPKeyVaultConfigException(
"You must supply an Azure region when you create the client",
"in order to create new vaults.",
)
parameters = self._get_params()
cred = BasicTokenAuthentication({"access_token": self.auth_client.token})
if not self.resource_group:
raise MPKeyVaultConfigException(
"No value for resource_group in arguments or "
"KeyVault/ResourceGroup in settings."
)
if not self.azure_region:
raise MPKeyVaultConfigException(
"No value for azure_region in arguments "
"KeyVault/AzureRegion in settings."
)
mgmt = KeyVaultManagementClient(cred, self.subscription_id)
vault = mgmt.vaults.create_or_update(
self.resource_group, vault_name, parameters
).result()
return vault
"""
Create new or update existing vault.
Parameters
----------
vault_name : str
Name of the Vault
Returns
-------
Vault
The Vault object.
"""
if not self.azure_region:
raise MPKeyVaultConfigException(
"You must supply an Azure region when you create the client",
"in order to create new vaults.",
)
parameters = self._get_params()
cred = BasicTokenAuthentication({"access_token": self.auth_client.token})
if not self.resource_group:
raise MPKeyVaultConfigException(
"No value for resource_group in arguments or "
"KeyVault/ResourceGroup in settings."
)
if not self.azure_region:
raise MPKeyVaultConfigException(
"No value for azure_region in arguments "
"KeyVault/AzureRegion in settings."
)
mgmt = KeyVaultManagementClient(cred, self.subscription_id)
if not vault_uri and not vault_name:
if "vaultname" in self.settings:
vault_name = self.settings["vaultname"]
else:
raise MPKeyVaultMissingVaultException(
"No vault name or URI was supplied."
)
if vault_uri:
self.vault_uri = vault_uri
else:
vault_uri = self.settings.keyvault_uri
if vault_uri:
self.vault_uri = vault_uri.format(vault=vault_name)
else:
raise MPKeyVaultConfigException(
"Could not determine keyvault URI for cloud."
)
if self.debug:
print(f"Using Vault URI {self.vault_uri}")
# self.auth_client = KeyringAuthClient(
# tenant_id,
# self._CLIENT_ID,
# self._CLIENT_URI,
# self._KEYRING_NAME,
# debug=self.debug,
# )
self.kv_client = self._get_secret_client()
Raises
------
MsticpyConfigException
Missing or invalid configuration settings.
Notes
-----
Requires KeyVault settings to be defined in msticpyconfig.yaml
"""
self._kv_settings = KeyVaultSettings()
self.tenant_id = tenant_id or self._kv_settings.get("tenantid")
if not self.tenant_id:
raise MPKeyVaultConfigException(
"TenantID must be specified in KeyVault settings section",
"in msticpyconfig.yaml",
)
self.kv_secret_vault: Dict[str, str] = {}
self.kv_vaults: Dict[str, BHKeyVaultClient] = {}
self._use_keyring = use_keyring or self._kv_settings.get("UseKeyring", False)
if self._use_keyring:
self._keyring_client = KeyringClient("Providers")
"""
if not self.azure_region:
raise MPKeyVaultConfigException(
"You must supply an Azure region when you create the client",
"in order to create new vaults.",
)
parameters = self._get_params()
cred = BasicTokenAuthentication({"access_token": self.auth_client.token})
if not self.resource_group:
raise MPKeyVaultConfigException(
"No value for resource_group in arguments or "
"KeyVault/ResourceGroup in settings."
)
if not self.azure_region:
raise MPKeyVaultConfigException(
"No value for azure_region in arguments "
"KeyVault/AzureRegion in settings."
)
mgmt = KeyVaultManagementClient(cred, self.subscription_id)
vault = mgmt.vaults.create_or_update(
self.resource_group, vault_name, parameters
).result()
return vault
self.debug = kwargs.pop("debug", False)
self.settings: KeyVaultSettings = settings or KeyVaultSettings()
self.tenant_id = tenant_id or self.settings.get("tenantid")
if not self.tenant_id:
raise MPKeyVaultConfigException(
"No value for tenant_id in arguments " "KeyVault/TenantID in settings."
)
self.subscription_id = subscription_id or self.settings.get("subscriptionid")
if not self.subscription_id:
raise MPKeyVaultConfigException(
"No value for subscription_id in arguments "
"KeyVault/SubscriptionID in settings."
)
self._client_uri = kwargs.pop("mgmt_uri", None) or self.settings.mgmt_uri
if not self._client_uri:
raise MPKeyVaultConfigException(
"Could not obtain an azure management URI from arguments or settings."
)
self.auth_client = AuthClient(
tenant_id=self.tenant_id,
client_id=self.settings.CLIENT_ID,
client_uri=self._client_uri,
name="mgmt",
)
self.resource_group = resource_group or self.settings.get("resourcegroup")
self.azure_region = azure_region or self.settings.get("azureregion")