Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _new_account(self):
from azure.cli.core.profiles import ResourceType, get_sdk
SubscriptionType, StateType = get_sdk(self.cli_ctx, ResourceType.MGMT_RESOURCE_SUBSCRIPTIONS, 'Subscription',
'SubscriptionState', mod='models')
s = SubscriptionType()
s.state = StateType.enabled
return s
def generic_data_service_factory(cli_ctx, service, name=None, key=None, connection_string=None, sas_token=None,
socket_timeout=None, token_credential=None):
try:
return get_storage_data_service_client(cli_ctx, service, name, key, connection_string, sas_token,
socket_timeout, token_credential)
except ValueError as val_exception:
_ERROR_STORAGE_MISSING_INFO = get_sdk(cli_ctx, CUSTOM_DATA_STORAGE,
'common._error#_ERROR_STORAGE_MISSING_INFO')
message = str(val_exception)
if message == _ERROR_STORAGE_MISSING_INFO:
message = MISSING_CREDENTIALS_ERROR_MESSAGE
raise CLIError(message)
def _create_role_assignment(cli_ctx, role, assignee,
resource_group_name=None, scope=None, resolve_assignee=True):
from azure.cli.core.profiles import ResourceType, get_sdk
factory = get_auth_management_client(cli_ctx, scope)
assignments_client = factory.role_assignments
definitions_client = factory.role_definitions
scope = _build_role_scope(resource_group_name, scope, assignments_client.config.subscription_id)
role_id = _resolve_role_id(role, scope, definitions_client)
# If the cluster has service principal resolve the service principal client id to get the object id,
# if not use MSI object id.
object_id = _resolve_object_id(cli_ctx, assignee) if resolve_assignee else assignee
RoleAssignmentCreateParameters = get_sdk(cli_ctx, ResourceType.MGMT_AUTHORIZATION,
'RoleAssignmentCreateParameters', mod='models',
operation_group='role_assignments')
parameters = RoleAssignmentCreateParameters(role_definition_id=role_id, principal_id=object_id)
assignment_name = uuid.uuid4()
custom_headers = None
return assignments_client.create(scope, assignment_name, parameters, custom_headers=custom_headers)
def _default_certificate_profile(cli_ctx, subject):
CertificateAttributes = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.certificate_attributes#CertificateAttributes')
CertificatePolicy = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.certificate_policy#CertificatePolicy')
ActionType = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.key_vault_client_enums#ActionType')
KeyUsageType = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.key_vault_client_enums#KeyUsageType')
IssuerParameters = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.issuer_parameters#IssuerParameters')
KeyProperties = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.key_properties#KeyProperties')
LifetimeAction = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.lifetime_action#LifetimeAction')
SecretProperties = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.secret_properties#SecretProperties')
X509CertificateProperties = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.x509_certificate_properties#X509CertificateProperties')
Trigger = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.trigger#Trigger')
Action = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.action#Action')
template = CertificatePolicy(key_properties=KeyProperties(exportable=True,
key_type=u'RSA',
key_size=2048,
reuse_key=True),
secret_properties=SecretProperties(
content_type=u'application/x-pkcs12'),
x509_certificate_properties=X509CertificateProperties(key_usage=[KeyUsageType.c_rl_sign,
KeyUsageType.data_encipherment,
KeyUsageType.digital_signature,
KeyUsageType.key_encipherment,
KeyUsageType.key_agreement,
KeyUsageType.key_cert_sign],
subject=subject,
validity_in_months=12),
lifetime_actions=[LifetimeAction(trigger=Trigger(days_before_expiry=90),
def get_table_data_type(cli_ctx, module_name, *type_names):
if cosmosdb_table_exists(cli_ctx):
return get_sdk(cli_ctx, ResourceType.DATA_COSMOS_TABLE, *type_names, mod=module_name)
return get_sdk(cli_ctx, CUSTOM_DATA_STORAGE, *type_names, mod=module_name)
:param str vnet_gateway2: Name or ID of the destination virtual network gateway to connect to
using a 'Vnet2Vnet' connection.
:param str local_gateway2: Name or ID of the destination local network gateway to connect to
using an 'IPSec' connection.
:param str express_route_circuit2: Name or ID of the destination ExpressRoute to connect to
using an 'ExpressRoute' connection.
:param str authorization_key: The authorization key for the VPN connection.
:param bool enable_bgp: Enable BGP for this VPN connection.
:param bool no_wait: Do not wait for the long running operation to finish.
:param bool validate: Display and validate the ARM template but do not create any resources.
"""
from azure.cli.core.util import random_string
from azure.cli.command_modules.network._template_builder import \
ArmTemplateBuilder, build_vpn_connection_resource
DeploymentProperties = get_sdk(ResourceType.MGMT_RESOURCE_RESOURCES,
'DeploymentProperties', mod='models')
tags = tags or {}
# Build up the ARM template
master_template = ArmTemplateBuilder()
vpn_connection_resource = build_vpn_connection_resource(
connection_name, location, tags, vnet_gateway1,
vnet_gateway2 or local_gateway2 or express_route_circuit2,
connection_type, authorization_key, enable_bgp, routing_weight, shared_key,
use_policy_based_traffic_selectors)
master_template.add_resource(vpn_connection_resource)
master_template.add_output('resource', connection_name, output_type='object')
template = master_template.build()
# deploy ARM template
def import_certificate(cli_ctx, vault_base_url, certificate_name, certificate_data,
disabled=False, password=None, certificate_policy=None, tags=None):
CertificateAttributes = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.certificate_attributes#CertificateAttributes')
CertificatePolicy = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.certificate_policy#CertificatePolicy')
SecretProperties = get_sdk(cli_ctx, ResourceType.DATA_KEYVAULT, 'models.secret_properties#SecretProperties')
import binascii
certificate_data = open(certificate_data, 'rb').read()
x509 = None
content_type = None
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, certificate_data)
# if we get here, we know it was a PEM file
content_type = 'application/x-pem-file'
try:
# for PEM files (including automatic endline conversion for
# Windows)
certificate_data = certificate_data.decode(
'utf-8').replace('\r\n', '\n')
except UnicodeDecodeError:
certificate_data = binascii.b2a_base64(
def _create_role_assignment(cli_ctx, role, assignee, resource_group_name=None, scope=None, resolve_assignee=True):
from azure.cli.core.profiles import ResourceType, get_sdk
factory = get_auth_management_client(cli_ctx, scope)
assignments_client = factory.role_assignments
definitions_client = factory.role_definitions
scope = _build_role_scope(resource_group_name, scope, assignments_client.config.subscription_id)
role_id = _resolve_role_id(role, scope, definitions_client)
object_id = _resolve_object_id(cli_ctx, assignee) if resolve_assignee else assignee
RoleAssignmentCreateParameters = get_sdk(cli_ctx, ResourceType.MGMT_AUTHORIZATION,
'RoleAssignmentCreateParameters', mod='models',
operation_group='role_assignments')
parameters = RoleAssignmentCreateParameters(role_definition_id=role_id, principal_id=object_id)
assignment_name = uuid.uuid4()
custom_headers = None
return assignments_client.create(scope, assignment_name, parameters, custom_headers=custom_headers)
def _get_storage_prerequisites(cmd, storage_account, container):
t_page_blob_service = get_sdk(cmd.cli_ctx, ResourceType.DATA_STORAGE, 'blob.pageblobservice#PageBlobService')
mgmt_client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_STORAGE)
acc = next((x for x in mgmt_client.storage_accounts.list() if x.name.lower() == storage_account.lower()), None)
account_key = None
if acc:
rg = parse_resource_id(acc.id)['resource_group']
t_storage_account_keys, t_storage_account_list_keys_results = get_sdk(
cmd.cli_ctx, ResourceType.MGMT_STORAGE,
'models#StorageAccountKeys',
'models#StorageAccountListKeysResult')
if t_storage_account_keys:
account_key = mgmt_client.storage_accounts.list_keys(rg, storage_account).key1
elif t_storage_account_list_keys_results:
account_key = mgmt_client.storage_accounts.list_keys(rg, storage_account).keys[0].value
def create_nw_packet_capture(client, resource_group_name, capture_name, vm,
watcher_rg, watcher_name, location=None,
storage_account=None, storage_path=None, file_path=None,
capture_size=None, capture_limit=None, time_limit=None, filters=None):
PacketCapture, PacketCaptureStorageLocation = \
get_sdk(ResourceType.MGMT_NETWORK, 'PacketCapture', 'PacketCaptureStorageLocation',
mod='models')
storage_settings = PacketCaptureStorageLocation(storage_id=storage_account,
storage_path=storage_path, file_path=file_path)
capture_params = PacketCapture(target=vm, storage_location=storage_settings,
bytes_to_capture_per_packet=capture_size,
total_bytes_per_session=capture_limit, time_limit_in_seconds=time_limit,
filters=filters)
return client.create(watcher_rg, watcher_name, capture_name, capture_params)