Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_out_json_byte_empty(self):
output_producer = OutputProducer(cli_ctx=self.mock_ctx)
output_producer.out(CommandResultItem({'active': True, 'contents': b''}),
formatter=format_json, out_file=self.io)
self.assertEqual(normalize_newlines(self.io.getvalue()), normalize_newlines(
"""{
"active": true,
def test_out_table_no_query_yes_jmespath_table_transformer(self):
output_producer = OutputProducer(cli_ctx=self.mock_ctx)
obj = {'name': 'qwerty', 'val': '0b1f6472qwerty', 'active': True, 'sub': '0b1f6472'}
result_item = CommandResultItem(obj,
table_transformer='{Name:name, Val:val, Active:active}',
is_query_active=False)
output_producer.out(result_item, formatter=format_table, out_file=self.io)
# Should be table transformer order
self.assertEqual(normalize_newlines(self.io.getvalue()), normalize_newlines(
"""Name Val Active
------ -------------- --------
def test_output_format_dict_sort(self):
obj = {}
obj['B'] = 1
obj['A'] = 2
result = format_tsv(CommandResultItem(obj))
self.assertEqual(result, '2\t1\n')
from azdev.utilities import IS_WINDOWS, display
# use default message if custom not provided
if message is True:
message = 'Running: {}\n'.format(command)
if message:
display(message)
try:
output = subprocess.check_output(
command.split(),
stderr=subprocess.STDOUT if show_stderr else None,
shell=IS_WINDOWS,
**kwargs).decode('utf-8').strip()
return CommandResultItem(output, exit_code=0, error=None)
except subprocess.CalledProcessError as err:
return CommandResultItem(err.output, exit_code=err.returncode, error=err)
# use default message if custom not provided
if message is True:
message = 'Running: {}\n'.format(command)
if message:
display(message)
try:
output = subprocess.check_output(
command.split(),
stderr=subprocess.STDOUT if show_stderr else None,
shell=IS_WINDOWS,
**kwargs).decode('utf-8').strip()
return CommandResultItem(output, exit_code=0, error=None)
except subprocess.CalledProcessError as err:
return CommandResultItem(err.output, exit_code=err.returncode, error=err)
def _combine_command_result(cli_result, ext_result):
final_result = CommandResultItem(None)
def apply_result(item):
if item:
final_result.exit_code += item.exit_code
if item.error:
if final_result.error:
try:
final_result.error.message += item.error.message
except AttributeError:
final_result.error.message += str(item.error)
else:
final_result.error = item.error
setattr(final_result.error, 'message', '')
if item.result:
if final_result.result:
final_result.result += item.result
def execute(self, args):
try:
return super(SFInvoker, self).execute(args)
# For exceptions happening while handling http requests, FabricErrorException is thrown with
# 'Internal Server Error' message, but here we handle the case where gateway is unable
# to find the service.
except TypeError:
if args[0] == 'events':
from knack.log import get_logger
logger = get_logger(__name__)
logger.error('Service is not installed.')
return CommandResultItem(None, exit_code=0)
raise
self.parser.cli_ctx = self.cli_ctx
self.parser.load_command_table(self.commands_loader)
self.cli_ctx.raise_event(EVENT_INVOKER_CMD_TBL_LOADED, cmd_tbl=self.commands_loader.command_table,
parser=self.parser)
arg_check = [a for a in args if a not in ['--debug', '--verbose']]
if not arg_check:
self.parser.enable_autocomplete()
subparser = self.parser.subparsers[tuple()]
self.help.show_welcome(subparser)
# TODO: No event in base with which to target
telemetry.set_command_details('az')
telemetry.set_success(summary='welcome')
return CommandResultItem(None, exit_code=0)
if args[0].lower() == 'help':
args[0] = '--help'
self.parser.enable_autocomplete()
self.cli_ctx.raise_event(EVENT_INVOKER_PRE_PARSE_ARGS, args=args)
parsed_args = self.parser.parse_args(args)
self.cli_ctx.raise_event(EVENT_INVOKER_POST_PARSE_ARGS, command=parsed_args.command, args=parsed_args)
# TODO: This fundamentally alters the way Knack.invocation works here. Cannot be customized
# with an event. Would need to be customized via inheritance.
cmd = parsed_args.func
self.cli_ctx.data['command'] = parsed_args.command
ids = getattr(parsed_args, '_ids', None) or [None] * len(jobs)
if self.cli_ctx.config.getboolean('core', 'disable_concurrent_ids', False) or len(ids) < 2:
results, exceptions = self._run_jobs_serially(jobs, ids)
else:
results, exceptions = self._run_jobs_concurrently(jobs, ids)
# handle exceptions
if len(exceptions) == 1 and not results:
ex, id_arg = exceptions[0]
raise ex
if exceptions:
for exception, id_arg in exceptions:
logger.warning('%s: "%s"', id_arg, str(exception))
if not results:
return CommandResultItem(None, exit_code=1, error=CLIError('Encountered more than one exception.'))
logger.warning('Encountered more than one exception.')
if results and len(results) == 1:
results = results[0]
event_data = {'result': results}
self.cli_ctx.raise_event(EVENT_INVOKER_FILTER_RESULT, event_data=event_data)
return CommandResultItem(
event_data['result'],
table_transformer=self.commands_loader.command_table[parsed_args.command].table_transformer,
is_query_active=self.data['query_active'])
from .util import CommandResultItem
if not isinstance(args, (list, tuple)):
raise TypeError('args should be a list or tuple.')
exit_code = 0
try:
args = self.completion.get_completion_args() or args
out_file = out_file or self.out_file
self.logging.configure(args)
logger.debug('Command arguments: %s', args)
self.raise_event(EVENT_CLI_PRE_EXECUTE)
if CLI._should_show_version(args):
self.show_version()
self.result = CommandResultItem(None)
else:
self.invocation = self.invocation_cls(cli_ctx=self,
parser_cls=self.parser_cls,
commands_loader_cls=self.commands_loader_cls,
help_cls=self.help_cls,
initial_data=initial_invocation_data)
cmd_result = self.invocation.execute(args)
self.result = cmd_result
exit_code = self.result.exit_code
output_type = self.invocation.data['output']
if cmd_result and cmd_result.result is not None:
formatter = self.output.get_formatter(output_type)
self.output.out(cmd_result, formatter=formatter, out_file=out_file)
self.raise_event(EVENT_CLI_POST_EXECUTE)
except KeyboardInterrupt as ex:
self.result = CommandResultItem(None, error=ex)