Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_application_gateway(self, id):
id_dict = parse_resource_id(id)
try:
return self.network_client.application_gateways.get(id_dict.get('resource_group', self.resource_group), id_dict.get('name'))
except CloudError as exc:
self.fail("Error fetching application_gateway {0} - {1}".format(id, str(exc)))
raise CLIError('Please provide either --aad-client-cert-thumbprint or --aad-client-secret')
if volume_type is None:
if not is_linux:
volume_type = _ALL_VOLUME_TYPE
elif vm.storage_profile.data_disks:
raise CLIError('VM has data disks, please supply --volume-type')
else:
volume_type = 'OS'
# sequence_version should be unique
sequence_version = uuid.uuid4()
# retrieve keyvault details
disk_encryption_keyvault_url = get_key_vault_base_url(
cmd.cli_ctx, (parse_resource_id(disk_encryption_keyvault))['name'])
# disk encryption key itself can be further protected, so let us verify
if key_encryption_key:
key_encryption_keyvault = key_encryption_keyvault or disk_encryption_keyvault
# to avoid bad server errors, ensure the vault has the right configurations
_verify_keyvault_good_for_encryption(cmd.cli_ctx, disk_encryption_keyvault, key_encryption_keyvault, vm, force)
# if key name and not key url, get url.
if key_encryption_key and '://' not in key_encryption_key: # if key name and not key url
key_encryption_key = _get_keyvault_key_url(
cmd.cli_ctx, (parse_resource_id(key_encryption_keyvault))['name'], key_encryption_key)
# 2. we are ready to provision/update the disk encryption extensions
# The following logic was mostly ported from xplat-cli
public_config = {
def update_tracing_config(cmd, resource_group, service_name, location, resource_properties, app_insights_key,
app_insights, disable_distributed_tracing):
create_app_insights = False
if app_insights_key is not None:
resource_properties.trace = models.TraceProperties(
enabled=True, app_insight_instrumentation_key=app_insights_key)
elif app_insights is not None:
if is_valid_resource_id(app_insights):
resource_id_dict = parse_resource_id(app_insights)
instrumentation_key = get_app_insights_key(cmd.cli_ctx, resource_id_dict['resource_group'],
resource_id_dict['resource_name'])
resource_properties.trace = models.TraceProperties(
enabled=True, app_insight_instrumentation_key=instrumentation_key)
else:
instrumentation_key = get_app_insights_key(cmd.cli_ctx, resource_group, app_insights)
resource_properties.trace = models.TraceProperties(
enabled=True, app_insight_instrumentation_key=instrumentation_key)
elif disable_distributed_tracing is not True:
create_app_insights = True
if create_app_insights is True:
try:
instrumentation_key = try_create_application_insights(cmd, resource_group, service_name, location)
if instrumentation_key is not None:
resource_properties.trace = models.TraceProperties(
def _verify_keyvault_good_for_encryption(cli_ctx, disk_vault_id, kek_vault_id, vm_or_vmss, force):
def _report_client_side_validation_error(msg):
if force:
logger.warning("WARNING: %s %s", msg, "Encryption might fail.")
else:
from knack.util import CLIError
raise CLIError("ERROR: {}".format(msg))
resource_type = "VMSS" if vm_or_vmss.type.lower().endswith("virtualmachinescalesets") else "VM"
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.cli.core.profiles import ResourceType
from msrestazure.tools import parse_resource_id
client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_KEYVAULT).vaults
disk_vault_resource_info = parse_resource_id(disk_vault_id)
key_vault = client.get(disk_vault_resource_info['resource_group'], disk_vault_resource_info['name'])
# ensure vault has 'EnabledForDiskEncryption' permission
if not key_vault.properties.enabled_for_disk_encryption:
_report_client_side_validation_error("Keyvault '{}' is not enabled for disk encryption.".format(
disk_vault_resource_info['resource_name']))
if kek_vault_id:
kek_vault_info = parse_resource_id(kek_vault_id)
if disk_vault_resource_info['name'].lower() != kek_vault_info['name'].lower():
client.get(kek_vault_info['resource_group'], kek_vault_info['name'])
# verify subscription mataches
vm_vmss_resource_info = parse_resource_id(vm_or_vmss.id)
if vm_vmss_resource_info['subscription'].lower() != disk_vault_resource_info['subscription'].lower():
_report_client_side_validation_error("{} {}'s subscription does not match keyvault's subscription."
def parse_domain_name(domain):
from msrestazure.tools import parse_resource_id, is_valid_resource_id
domain_name = None
if is_valid_resource_id(domain):
parsed_domain_id = parse_resource_id(domain)
domain_name = parsed_domain_id['resource_name']
return domain_name
def _validator(cmd, namespace):
from msrestazure.tools import parse_resource_id
location = namespace.location
network_client = get_mgmt_service_client(cmd.cli_ctx, CUSTOM_NW_CONNECTION_MONITOR).network_watchers
watcher = next((x for x in network_client.list_all() if x.location.lower() == location.lower()), None)
if not watcher:
raise CLIError("network watcher is not enabled for region '{}'.".format(location))
id_parts = parse_resource_id(watcher.id)
setattr(namespace, rg_name, id_parts['resource_group'])
setattr(namespace, watcher_name, id_parts['name'])
if remove:
del namespace.location
client = web_client_factory(cmd.cli_ctx)
plan_info = None
if consumption_plan_location:
locations = list_consumption_locations(cmd)
location = next((l for l in locations if l['name'].lower() == consumption_plan_location.lower()), None)
if location is None:
raise CLIError("Location is invalid. Use: az functionapp list-consumption-locations")
functionapp_def.location = consumption_plan_location
functionapp_def.kind = 'functionapp'
# if os_type is None, the os type is windows
is_linux = os_type and os_type.lower() == 'linux'
else: # apps with SKU based plan
if is_valid_resource_id(plan):
parse_result = parse_resource_id(plan)
plan_info = client.app_service_plans.get(parse_result['resource_group'], parse_result['name'])
else:
plan_info = client.app_service_plans.get(resource_group_name, plan)
if not plan_info:
raise CLIError("The plan '{}' doesn't exist".format(plan))
location = plan_info.location
is_linux = plan_info.reserved
functionapp_def.server_farm_id = plan
functionapp_def.location = location
if is_linux and not runtime and (consumption_plan_location or not deployment_container_image_name):
raise CLIError(
"usage error: --runtime RUNTIME required for linux functions apps without custom image.")
if runtime:
if is_linux and runtime not in LINUX_RUNTIMES:
def update_functionapp(cmd, instance, plan=None):
client = web_client_factory(cmd.cli_ctx)
if plan is not None:
if is_valid_resource_id(plan):
dest_parse_result = parse_resource_id(plan)
dest_plan_info = client.app_service_plans.get(dest_parse_result['resource_group'],
dest_parse_result['name'])
else:
dest_plan_info = client.app_service_plans.get(instance.resource_group, plan)
if dest_plan_info is None:
raise CLIError("The plan '{}' doesn't exist".format(plan))
validate_plan_switch_compatibility(client, instance, dest_plan_info)
instance.server_farm_id = dest_plan_info.id
return instance
def get_source_vm(self):
vm_resource_id = format_resource_id(self.source,
self.subscription_id,
'Microsoft.Compute',
'virtualMachines',
self.resource_group)
resource = parse_resource_id(vm_resource_id)
return self.get_vm(resource['resource_group'], resource['name']) if resource['type'] == 'virtualMachines' else None
def get_network_watcher_from_vm(cmd, namespace):
from msrestazure.tools import parse_resource_id
compute_client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_COMPUTE).virtual_machines
vm_name = parse_resource_id(namespace.vm)['name']
vm = compute_client.get(namespace.resource_group_name, vm_name)
namespace.location = vm.location # pylint: disable=no-member
get_network_watcher_from_location()(cmd, namespace)