Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_prompt_msg_question_with_help_string(self, _):
expected_result = 'My response'
with mock.patch('knack.prompting._input', side_effect=['?', expected_result]):
with mock.patch('sys.stdout', new_callable=StringIO) as mock_stdout:
actual_result = prompt('Please enter some text: ', help_string='Anything you want!')
self.assertEqual(expected_result, actual_result)
self.assertIn('Anything you want!', mock_stdout.getvalue())
def _prompt_for_prop_input(prop_name, prop_type):
verify_is_a_tty_or_raise_error('The template requires a few inputs. These cannot be provided as in command '
'arguments. It can only be input interatively.')
val = prompt(msg='Please enter a value for {prop_name}: '.format(prop_name=prop_name),
help_string='Value of type {prop_type} is required.'.format(prop_type=prop_type))
print('')
return val
ext_repos.append(path)
display('Repo {} OK.'.format(path))
return True
# Determine extension repos
# Allows the user to simply press RETURN to use their cwd, assuming they are in their desired extension
# repo directory. To use multiple extension repos or identify a repo outside the cwd, they must specify
# the path.
if prompt_y_n('\nDo you plan to develop CLI extensions?'):
display('\nGreat! Input the paths for the extension repos you wish to develop for, one per '
'line. You can add as many repos as you like. (TIP: to quickly get started, press RETURN to '
'use your current working directory).')
first_repo = True
while True:
msg = '\nPath ({}): '.format('RETURN to use current directory' if first_repo else 'RETURN to continue')
ext_repo_path = prompt(msg, None)
if not ext_repo_path:
if first_repo and not add_ext_repo(os.getcwd()):
first_repo = False
continue
break
add_ext_repo(os.path.abspath(os.path.expanduser(ext_repo_path)))
first_repo = False
display('\nTIP: you can manage extension repos later with the `azdev extension repo` commands.')
# Determine extensions
if ext_repos:
if prompt_y_n('\nWould you like to install certain extensions by default? '):
display('\nGreat! Input the names of the extensions you wish to install, one per '
'line. You can add as many repos as you like. Press RETURN to continue to the next step.')
available_extensions = [x['name'] for x in list_extensions()]
if not config_exists or should_modify_global_config:
# no config exists yet so configure global config or user wants to modify global config
with ConfiguredDefaultSetter(config, False):
output_index = prompt_choice_list(MSG_PROMPT_GLOBAL_OUTPUT, OUTPUT_LIST,
default=get_default_from_config(config,
'core', 'output',
OUTPUT_LIST))
answers['output_type_prompt'] = output_index
answers['output_type_options'] = str(OUTPUT_LIST)
enable_file_logging = prompt_y_n(MSG_PROMPT_FILE_LOGGING, default='n')
allow_telemetry = prompt_y_n(MSG_PROMPT_TELEMETRY, default='y')
answers['telemetry_prompt'] = allow_telemetry
cache_ttl = None
while not cache_ttl:
try:
cache_ttl = prompt(MSG_PROMPT_CACHE_TTL) or DEFAULT_CACHE_TTL
# ensure valid int by casting
cache_value = int(cache_ttl)
if cache_value < 1:
raise ValueError
except ValueError:
logger.error('TTL must be a positive integer')
cache_ttl = None
# save the global config
config.set_value('core', 'output', OUTPUT_LIST[output_index]['name'])
config.set_value('core', 'collect_telemetry', 'yes' if allow_telemetry else 'no')
config.set_value('core', 'cache_ttl', cache_ttl)
config.set_value('logging', 'enable_log_file', 'yes' if enable_file_logging else 'no')
def prompt_not_empty(msg, help_string=None):
"""
Wrapper on knacks prompt function which does not return until non none value is recieved from user input.
"""
if not help_string:
help_string = 'This field cannot be left blank.'
user_input = None
while not user_input:
user_input = prompt(msg=msg, help_string=help_string)
return user_input
value = ''
no_tty = True
if value == '':
value = {} if param_type == 'object' else []
else:
try:
value = shell_safe_json_parse(value)
except Exception as ex: # pylint: disable=broad-except
logger.error(ex)
continue
result[param_name] = value
break
else:
try:
result[param_name] = prompt(prompt_str, help_string=description)
except NoTTYException:
result[param_name] = None
no_tty = True
break
if no_tty and fail_on_no_tty:
raise NoTTYException
return result
answers['login_index'] = method_index
answers['login_options'] = str(LOGIN_METHOD_LIST)
profile = Profile(cli_ctx=cli_ctx)
interactive = False
username = None
password = None
service_principal = None
tenant = None
if method_index == 0: # device auth
interactive = True
elif method_index == 1: # username and password
username = prompt('Username: ')
password = prompt_pass(msg='Password: ')
elif method_index == 2: # service principal with secret
service_principal = True
username = prompt('Service principal: ')
tenant = prompt('Tenant: ')
password = prompt_pass(msg='Client secret: ')
elif method_index == 3: # skip
return
try:
profile.find_subscriptions_on_login(
interactive,
username,
password,
service_principal,
tenant)
login_successful = True
logger.warning('Login successful!')
except AdalError as err:
logger.error('Login error!')
logger.error(err)
def process_github_repository(self):
while (
not self.github_repository or
not AzureDevopsBuildProvider.check_github_repository(self.github_pat, self.github_repository)
):
self.github_repository = prompt(msg="Github Repository Path (e.g. Azure/azure-cli): ").strip()
self.logger.warning("Successfully found Github repository.")
if registry:
if registry.admin_user_enabled:
logger.info("Attempting with admin credentials...")
try:
cred = cf_acr_registries(cli_ctx).list_credentials(resource_group_name, registry_name)
return login_server, cred.username, cred.passwords[0].value
except CLIError as e:
logger.warning("%s: %s", ADMIN_USER_BASE_ERROR_MESSAGE, str(e))
else:
logger.warning("%s: %s", ADMIN_USER_BASE_ERROR_MESSAGE, "Admin user is disabled.")
else:
logger.warning("%s: %s", ADMIN_USER_BASE_ERROR_MESSAGE, resource_not_found)
# 4. if we still don't have credentials, prompt the user
try:
username = prompt('Username: ')
password = prompt_pass(msg='Password: ')
username, password = _get_token_with_username_and_password(
login_server, username, password, repository, artifact_repository, permission, is_login_context
)
return login_server, username, password
except NoTTYException:
raise CLIError(
'Unable to authenticate using AAD or admin login credentials. ' +
'Please specify both username and password in non-interactive mode.')
return login_server, None, None