Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
subnet_resource_id=subnet_resource_id
if is_valid_resource_id(subnet_resource_id) else None)
# Create the load balancer configurations
load_balancer_object = LoadBalancerConfiguration(private_ip_address=private_ip_object,
public_ip_address_resource_id=public_ip_address_resource_id,
load_balancer_resource_id=load_balancer_resource_id,
probe_port=probe_port,
sql_virtual_machine_instances=sql_virtual_machine_instances)
# Create the availability group listener object
ag_listener_object = AvailabilityGroupListener(availability_group_name=availability_group_name,
load_balancer_configurations=[load_balancer_object],
port=port)
LongRunningOperation(cmd.cli_ctx)(sdk_no_wait(False, client.create_or_update, resource_group_name,
sql_virtual_machine_group_name, availability_group_listener_name,
ag_listener_object))
return client.get(resource_group_name, sql_virtual_machine_group_name, availability_group_listener_name)
network.network_security_groups.create_or_update(
resource_group_name=resource_group_name,
network_security_group_name=network_security_group_name,
parameters=NetworkSecurityGroup(location=location)
)
)
created_nsg = True
# update the NSG with the new rule to allow inbound traffic
SecurityRule = cmd.get_models('SecurityRule', resource_type=ResourceType.MGMT_NETWORK)
rule_name = 'open-port-all' if port == '*' else 'open-port-{}'.format(port)
rule = SecurityRule(protocol='*', access='allow', direction='inbound', name=rule_name,
source_port_range='*', destination_port_range=port, priority=priority,
source_address_prefix='*', destination_address_prefix='*')
nsg_name = nsg.name or os.path.split(nsg.id)[1]
LongRunningOperation(cmd.cli_ctx, 'Adding security rule')(
network.security_rules.create_or_update(
resource_group_name, nsg_name, rule_name, rule)
)
# update the NIC or subnet if a new NSG was created
if created_nsg and not apply_to_subnet:
nic.network_security_group = nsg
LongRunningOperation(cmd.cli_ctx, 'Updating NIC')(network.network_interfaces.create_or_update(
resource_group_name, nic.name, nic))
elif created_nsg and apply_to_subnet:
subnet.network_security_group = nsg
LongRunningOperation(cmd.cli_ctx, 'Updating subnet')(network.subnets.create_or_update(
resource_group_name=resource_group_name,
virtual_network_name=subnet_id['name'],
subnet_name=subnet_id['child_name_1'],
subnet_parameters=subnet
else:
return
else:
secrets[0].vault_certificates = []
secrets[0].vault_certificates.append(
VaultCertificate(secret_url, 'my'))
else:
vmss.virtual_machine_profile.os_profile.secrets = []
new_vault_certificates = []
new_vault_certificates.append(VaultCertificate(secret_url, 'my'))
vmss.virtual_machine_profile.os_profile.secrets.append(VaultSecretGroup(SubResource(vault_id),
new_vault_certificates))
poller = compute_client.virtual_machine_scale_sets.create_or_update(
resource_group_name, vmss.name, vmss)
return LongRunningOperation(cli_ctx)(poller)
subnet_obj = vnet_client.subnets.get(subnet_resource_group, subnet_vnet_name, subnet_name)
subnet_obj.service_endpoints = subnet_obj.service_endpoints or []
service_endpoint_exists = False
for s in subnet_obj.service_endpoints:
if s.service == "Microsoft.Web":
service_endpoint_exists = True
break
if not service_endpoint_exists:
web_service_endpoint = ServiceEndpointPropertiesFormat(service="Microsoft.Web")
subnet_obj.service_endpoints.append(web_service_endpoint)
poller = vnet_client.subnets.create_or_update(
subnet_resource_group, subnet_vnet_name,
subnet_name, subnet_parameters=subnet_obj)
# Ensure subnet is updated to avoid update conflict
LongRunningOperation(cli_ctx)(poller)
if client_arg_name in op_args:
client = client_factory(self.command_loader.cli_ctx, command_args)
command_args[client_arg_name] = client
if 'cmd' not in op_args:
command_args.pop('cmd')
try:
result = op(**command_args)
# apply results transform if specified
transform_result = merged_kwargs.get('transform', None)
if transform_result:
return _encode_hex(transform_result(result))
# otherwise handle based on return type of results
if isinstance(result, poller_classes()):
return _encode_hex(
LongRunningOperation(self.command_loader.cli_ctx, 'Starting {}'.format(name))(result))
if isinstance(result, Paged):
try:
return _encode_hex(list(result))
except TypeError:
# TODO: Workaround for an issue in either KeyVault server-side or msrest
# See https://github.com/Azure/autorest/issues/1309
return []
return _encode_hex(result)
except Exception as ex: # pylint: disable=broad-except
if name == 'show':
# show_exception_handler needs to be called before the keyvault_exception_handler
from azure.cli.core.commands.arm import show_exception_handler
try:
show_exception_handler(ex)
except Exception: # pylint: disable=broad-except
pass
if login_server in existingCreds:
raise CLIError("Login server '{}' already exists. You cannot add it again.".format(login_server))
TaskUpdateParameters = cmd.get_models('TaskUpdateParameters')
taskUpdateParameters = TaskUpdateParameters(
credentials=get_custom_registry_credentials(
cmd=cmd,
login_server=login_server,
username=username,
password=password,
identity=use_identity
)
)
resp = LongRunningOperation(cmd.cli_ctx)(
client.update(resource_group_name, registry_name, task_name, taskUpdateParameters)
)
resp = resp.credentials
return {} if not resp else resp.custom_registries
def setter(webapp):
webapp.identity = ManagedServiceIdentity(type='SystemAssigned')
poller = _generic_site_operation(cmd.cli_ctx, resource_group_name, name, 'create_or_update', slot, webapp)
return LongRunningOperation(cmd.cli_ctx)(poller)
)
headers = get_aks_custom_headers(aks_custom_headers)
# Due to SPN replication latency, we do a few retries here
max_retry = 30
retry_exception = Exception(None)
for _ in range(0, max_retry):
try:
logger.info('AKS cluster is creating, please wait...')
# some addons require post cluster creation role assigment
need_post_creation_role_assignment = monitoring or ingress_appgw_addon_enabled
if need_post_creation_role_assignment:
# adding a wait here since we rely on the result for role assignment
created_cluster = LongRunningOperation(cmd.cli_ctx)(client.create_or_update(
resource_group_name=resource_group_name,
resource_name=name,
parameters=mc,
custom_headers=headers))
cloud_name = cmd.cli_ctx.cloud.name
# add cluster spn/msi Monitoring Metrics Publisher role assignment to publish metrics to MDM
# mdm metrics is supported only in azure public cloud, so add the role assignment only in this cloud
if monitoring and cloud_name.lower() == 'azurecloud':
from msrestazure.tools import resource_id
cluster_resource_id = resource_id(
subscription=subscription_id,
resource_group=resource_group_name,
namespace='Microsoft.ContainerService', type='managedClusters',
name=name
)
_add_monitoring_role_assignment(created_cluster, cluster_resource_id, cmd)
def _add_node_type_to_sfrp(cmd, client, resource_group_name, cluster_name, cluster, node_type_name, capacity, durability_level):
cluster.node_types.append(NodeTypeDescription(name=node_type_name,
client_connection_endpoint_port=DEFAULT_CLIENT_CONNECTION_ENDPOINT,
http_gateway_endpoint_port=DEFAULT_HTTP_GATEWAY_ENDPOINT,
is_primary=False,
vm_instance_count=int(capacity),
durability_level=durability_level,
application_ports=EndpointRangeDescription(
start_port=DEFAULT_APPLICATION_START_PORT, end_port=DEFAULT_APPLICATION_END_PORT),
ephemeral_ports=EndpointRangeDescription(
start_port=DEFAULT_EPHEMERAL_START, end_port=DEFAULT_EPHEMERAL_END)))
patch_request = ClusterUpdateParameters(node_types=cluster.node_types)
poller = client.update(resource_group_name, cluster_name, patch_request)
LongRunningOperation(cmd.cli_ctx)(poller)
def setter(webapp):
webapp.identity = ManagedServiceIdentity(type='SystemAssigned')
poller = _generic_site_operation(cmd.cli_ctx, resource_group_name, name, 'create_or_update', slot, webapp)
return LongRunningOperation(cmd.cli_ctx)(poller)