Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_parsed_resource_ids(resource_ids):
"""
Returns a generator of parsed resource ids. Raise when there is invalid resource id.
"""
if not resource_ids:
return None
for rid in resource_ids:
if not is_valid_resource_id(rid):
raise CLIError('az resource: error: argument --ids: invalid ResourceId value: \'%s\'' % rid)
return ({'resource_id': rid} for rid in resource_ids)
def get_id_from_azure_resource(cli_ctx, app, resource_group=None):
if is_valid_resource_id(app):
parsed = parse_resource_id(app)
resource_group, name, subscription = parsed["resource_group"], parsed["name"], parsed["subscription"]
client = applicationinsights_mgmt_plane_client(cli_ctx, subscription_id=subscription,
api_version='2015-05-01').components
return client.get(resource_group, name).app_id
if resource_group:
client = applicationinsights_mgmt_plane_client(cli_ctx, api_version='2015-05-01').components
return client.get(resource_group, app).app_id
return app
def simple_validator(cmd, namespace):
if namespace.virtual_network_name is None and namespace.subnet is None:
return
if namespace.subnet == '':
return
usage_error = ValueError('incorrect usage: ( --subnet ID | --subnet NAME --vnet-name NAME)')
# error if vnet-name is provided without subnet
if namespace.virtual_network_name and not namespace.subnet:
raise usage_error
# determine if subnet is name or ID
is_id = is_valid_resource_id(namespace.subnet)
# error if vnet-name is provided along with a subnet ID
if is_id and namespace.virtual_network_name:
raise usage_error
if not is_id and not namespace.virtual_network_name:
raise usage_error
if not is_id:
namespace.subnet = resource_id(
subscription=get_subscription_id(cmd.cli_ctx),
resource_group=namespace.resource_group_name,
namespace='Microsoft.Network',
type='virtualNetworks',
name=namespace.virtual_network_name,
child_type_1='subnets',
child_name_1=namespace.subnet)
resource_group_name):
'''
Validates elastic_pool_id is either None or a valid resource id.
If elastic_pool_id has a value but it is not a valid resource id,
then assume that user specified elastic pool name which we need to
convert to elastic pool id using the provided server & resource group
name.
Returns the elastic_pool_id, which may have been updated and may be None.
'''
from msrestazure.tools import resource_id, is_valid_resource_id
from azure.cli.core.commands.client_factory import get_subscription_id
if elastic_pool_id and not is_valid_resource_id(elastic_pool_id):
return resource_id(
subscription=get_subscription_id(cli_ctx),
resource_group=resource_group_name,
namespace='Microsoft.Sql',
type='servers',
name=server_name,
child_type_1='elasticPools',
child_name_1=elastic_pool_id)
return elastic_pool_id
def validate_storage_account(namespace):
from msrestazure.tools import parse_resource_id
if is_valid_resource_id(namespace.storage_account):
parsed_storage = parse_resource_id(namespace.storage_account)
storage_name = parsed_storage['resource_name']
namespace.storage_account = storage_name
def _ensure_aks_acr(cli_ctx,
client_id,
acr_name_or_id,
subscription_id, # pylint: disable=unused-argument
detach=False):
from msrestazure.tools import is_valid_resource_id, parse_resource_id
# Check if the ACR exists by resource ID.
if is_valid_resource_id(acr_name_or_id):
try:
parsed_registry = parse_resource_id(acr_name_or_id)
acr_client = cf_container_registry_service(cli_ctx, subscription_id=parsed_registry['subscription'])
registry = acr_client.registries.get(parsed_registry['resource_group'], parsed_registry['name'])
except CloudError as ex:
raise CLIError(ex.message)
_ensure_aks_acr_role_assignment(cli_ctx, client_id, registry.id, detach)
return
# Check if the ACR exists by name accross all resource groups.
registry_name = acr_name_or_id
registry_resource = 'Microsoft.ContainerRegistry/registries'
try:
registry = get_resource_by_name(cli_ctx, registry_name, registry_resource)
except CloudError as ex:
if 'was not found' in ex.message:
def _validate_and_get_connection_string(cli_ctx, resource_group_name, storage_account):
sa_resource_group = resource_group_name
if is_valid_resource_id(storage_account):
sa_resource_group = parse_resource_id(storage_account)['resource_group']
storage_account = parse_resource_id(storage_account)['name']
storage_client = get_mgmt_service_client(cli_ctx, StorageManagementClient)
storage_properties = storage_client.storage_accounts.get_properties(sa_resource_group,
storage_account)
error_message = ''
endpoints = storage_properties.primary_endpoints
sku = storage_properties.sku.name
allowed_storage_types = ['Standard_GRS', 'Standard_LRS', 'Standard_ZRS', 'Premium_LRS']
for e in ['blob', 'queue', 'table']:
if not getattr(endpoints, e, None):
error_message = "Storage account '{}' has no '{}' endpoint. It must have table, queue, and blob endpoints all enabled".format(storage_account, e) # pylint: disable=line-too-long
if sku not in allowed_storage_types:
error_message += 'Storage type {} is not allowed'.format(sku)
aad_tenant_id=aad_tenant_id, identifier=None,
name=name, create=create_aad,
customer_admin_group_id=customer_admin_group_id)
identity_providers.append(
OpenShiftManagedClusterIdentityProvider(
name='Azure AD',
provider=osa_aad_identity
)
)
auth_profile = OpenShiftManagedClusterAuthProfile(identity_providers=identity_providers)
default_router_profile = OpenShiftRouterProfile(name='default')
if vnet_peer is not None:
from msrestazure.tools import is_valid_resource_id, resource_id
if not is_valid_resource_id(vnet_peer):
vnet_peer = resource_id(
subscription=get_subscription_id(cmd.cli_ctx),
resource_group=resource_group_name,
namespace='Microsoft.Network', type='virtualNetwork',
name=vnet_peer
)
network_profile = NetworkProfile(vnet_cidr=vnet_prefix, peer_vnet_id=vnet_peer)
osamc = OpenShiftManagedCluster(
location=location, tags=tags,
open_shift_version="v3.11",
network_profile=network_profile,
auth_profile=auth_profile,
agent_pool_profiles=agent_pool_profiles,
master_pool_profile=agent_master_pool_profile,
def _ensure_aks_acr(cli_ctx,
client_id,
acr_name_or_id,
subscription_id,
detach=False):
from msrestazure.tools import is_valid_resource_id, parse_resource_id
# Check if the ACR exists by resource ID.
if is_valid_resource_id(acr_name_or_id):
try:
parsed_registry = parse_resource_id(acr_name_or_id)
acr_client = cf_container_registry_service(cli_ctx, subscription_id=parsed_registry['subscription'])
registry = acr_client.registries.get(parsed_registry['resource_group'], parsed_registry['name'])
except CloudError as ex:
raise CLIError(ex.message)
_ensure_aks_acr_role_assignment(cli_ctx, client_id, registry.id, detach)
return
# Check if the ACR exists by name accross all resource groups.
registry_name = acr_name_or_id
registry_resource = 'Microsoft.ContainerRegistry/registries'
try:
registry = get_resource_by_name(cli_ctx, registry_name, registry_resource)
except CloudError as ex:
if 'was not found' in ex.message: