Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_prompt_y_n_yes(self, _):
my_response = 'y'
with mock.patch('knack.prompting._input', return_value=my_response):
actual_result = prompt_y_n('Do you accept?')
self.assertTrue(actual_result)
while True:
msg = '\nPath ({}): '.format('RETURN to use current directory' if first_repo else 'RETURN to continue')
ext_repo_path = prompt(msg, None)
if not ext_repo_path:
if first_repo and not add_ext_repo(os.getcwd()):
first_repo = False
continue
break
add_ext_repo(os.path.abspath(os.path.expanduser(ext_repo_path)))
first_repo = False
display('\nTIP: you can manage extension repos later with the `azdev extension repo` commands.')
# Determine extensions
if ext_repos:
if prompt_y_n('\nWould you like to install certain extensions by default? '):
display('\nGreat! Input the names of the extensions you wish to install, one per '
'line. You can add as many repos as you like. Use * to install all extensions. '
'Press RETURN to continue to the next step.')
available_extensions = [x['name'] for x in list_extensions()]
while True:
ext_name = prompt('\nName (RETURN to continue): ', None)
if not ext_name:
break
if ext_name == '*':
exts = [x['path'] for x in list_extensions()]
break
if ext_name not in available_extensions:
logger.error("Extension '%s' not found. Check the spelling, and make "
"sure you added the repo first!", ext_name)
continue
display('Extension {} OK.'.format(ext_name))
def _check_if_force_push_required(self, remote_url, remote_branches):
force_push_required = False
if remote_branches:
self.logger.warning("The remote repository is not clean: {url}".format(url=remote_url))
self.logger.warning("If you wish to continue, a force push will be commited and "
"your local branches will overwrite the remote branches!")
self.logger.warning("Please ensure you have force push permission in {repo} repository.".format(
repo=self.repository_name
))
if self.allow_force_push is None:
consent = prompt_y_n("I consent to force push all local branches to Azure DevOps repository")
else:
consent = str2bool(self.allow_force_push)
if not consent:
self.adbp.remove_git_remote(self.organization_name, self.project_name, self.repository_name)
raise CLIError("Failed to obtain your consent.")
force_push_required = True
return force_push_required
def _deprecate_warning():
from knack.prompting import NoTTYException, prompt_y_n
logger.warning("The 'component' commands will be deprecated in the future.")
logger.warning("az component and subcommands may not work unless the CLI is installed with pip.")
try:
ans = prompt_y_n("Are you sure you want to continue?", default='n')
if not ans:
raise CLIError('Operation cancelled.')
except NoTTYException:
pass
def workitem_delete_confirmation(command_args):
return bool(prompt_y_n('Are you sure you want to delete this work item?'))
def delete_role_assignments(cmd, ids=None, assignee=None, role=None, resource_group_name=None,
scope=None, include_inherited=False, yes=None):
factory = _auth_client_factory(cmd.cli_ctx, scope)
assignments_client = factory.role_assignments
definitions_client = factory.role_definitions
ids = ids or []
if ids:
if assignee or role or resource_group_name or scope or include_inherited:
raise CLIError('When assignment ids are used, other parameter values are not required')
for i in ids:
assignments_client.delete_by_id(i)
return
if not any([ids, assignee, role, resource_group_name, scope, assignee, yes]):
from knack.prompting import prompt_y_n
msg = 'This will delete all role assignments under the subscription. Are you sure?'
if not prompt_y_n(msg, default="n"):
return
scope = _build_role_scope(resource_group_name, scope,
assignments_client.config.subscription_id)
assignments = _search_role_assignments(cmd.cli_ctx, assignments_client, definitions_client,
scope, assignee, role, include_inherited,
include_groups=False)
if assignments:
for a in assignments:
assignments_client.delete_by_id(a.id)
else:
raise CLIError('No matched assignments were found to delete')
def process_organization(self):
"""Helper to retrieve information about an organization / create a new one"""
if self.organization_name is None:
response = prompt_y_n('Would you like to use an existing Azure DevOps organization? ')
if response:
self._select_organization()
else:
self._create_organization()
self.created_organization = True
else:
self.cmd_selector.cmd_organization(self.organization_name)
def process_project(self):
"""Helper to retrieve information about a project / create a new one"""
# There is a new organization so a new project will be needed
if (self.project_name is None) and (self.created_organization):
self._create_project()
elif self.project_name is None:
use_existing_project = prompt_y_n('Would you like to use an existing Azure DevOps project? ')
if use_existing_project:
self._select_project()
else:
self._create_project()
else:
self.cmd_selector.cmd_project(self.organization_name, self.project_name)
self.logger.warning("To view your Azure DevOps project, "
"please visit https://dev.azure.com/{org}/{proj}".format(
org=self.organization_name,
proj=self.project_name
))
def _check_if_git_credential_required(self):
# Username and password are not required if git credential manager exists
if AzureDevopsBuildProvider.check_git_credential_manager():
return
# Manual setup alternative credential in Azure Devops
self.logger.warning("Please visit https://dev.azure.com/{org}/_usersSettings/altcreds".format(
org=self.organization_name,
))
self.logger.warning('Check "Enable alternate authentication credentials" and save your username and password.')
self.logger.warning("You may need to use this credential when pushing your code to Azure DevOps repository.")
consent = prompt_y_n("I have setup alternative authentication credentials for {repo}".format(
repo=self.repository_name
))
if not consent:
self.adbp.remove_git_remote(self.organization_name, self.project_name, self.repository_name)
raise CLIError("Failed to obtain your consent.")
def _clean_up_resources(resource_group_name, confirm):
try:
if confirm:
message = 'The clean-up will remove the resource group \'{rg}\' and all repair resources within:\n\n{r}' \
.format(rg=resource_group_name, r='\n'.join(_list_resource_ids_in_rg(resource_group_name)))
logger.warning(message)
if not prompt_y_n('Continue with clean-up and delete resources?'):
logger.warning('Skipping clean-up')
return
delete_resource_group_command = 'az group delete --name {name} --yes --no-wait'.format(name=resource_group_name)
logger.info('Cleaning up resources by deleting repair resource group \'%s\'...', resource_group_name)
_call_az_command(delete_resource_group_command)
# NoTTYException exception only thrown from confirm block
except NoTTYException:
logger.warning('Cannot confirm clean-up resouce in non-interactive mode.')
logger.warning('Skipping clean-up')
return
except AzCommandError as azCommandError:
# Only way to distinguish errors
resource_not_found_error_string = 'could not be found'
if resource_not_found_error_string in str(azCommandError):
logger.info('Resource group not found. Skipping clean up.')