Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
indicator.finish_stage(status, name)
if execution and execution.status == LIVEACTION_STATUS_FAILED:
args.depth = 1
self._print_execution_details(execution=execution, args=args, **kwargs)
sys.exit(1)
return self.app.client.managers['Execution'].get_by_id(parent_id, **kwargs)
class PackListCommand(resource.ResourceListCommand):
display_attributes = ['ref', 'name', 'description', 'version', 'author']
attribute_display_order = ['ref', 'name', 'description', 'version', 'author']
class PackGetCommand(resource.ResourceGetCommand):
pk_argument_name = 'ref'
display_attributes = ['name', 'version', 'author', 'email', 'keywords', 'description']
attribute_display_order = ['name', 'version', 'author', 'email', 'keywords', 'description']
help_string = 'Get information about an installed pack.'
class PackShowCommand(PackResourceCommand):
def __init__(self, resource, *args, **kwargs):
help_string = (
'Get information about an available %s from the index.'
% resource.get_display_name().lower()
)
super(PackShowCommand, self).__init__(resource, 'show', help_string, *args, **kwargs)
self.parser.add_argument(
'pack', help='Name of the %s to show.' % resource.get_display_name().lower()
attribute_transform_functions=self.attribute_transform_functions,
)
else:
self.print_output(
instances,
table.MultiColumnTable,
attributes=args.attr,
widths=args.width,
attribute_transform_functions=self.attribute_transform_functions,
)
if args.last and count and count > args.last:
table.SingleRowTable.note_box(self.resource_name, args.last)
class TraceGetCommand(resource.ResourceGetCommand, SingleTraceDisplayMixin):
display_attributes = ['all']
attribute_display_order = TRACE_ATTRIBUTE_DISPLAY_ORDER
attribute_transform_functions = {'start_timestamp': format_isodate_for_user_timezone}
pk_argument_name = 'id'
def __init__(self, resource, *args, **kwargs):
super(TraceGetCommand, self).__init__(resource, *args, **kwargs)
# Causation chains
self.causation_group = self.parser.add_mutually_exclusive_group()
self.causation_group.add_argument(
'-e', '--execution', help='Execution to show causation chain.'
)
self.causation_group.add_argument('-r', '--rule', help='Rule to show causation chain.')
),
required=False,
)
@resource.add_auth_token_to_kwargs_from_cli
def run(self, args, **kwargs):
if args.timer_type:
kwargs['timer_type'] = args.timer_type
if kwargs:
return self.manager.query(**kwargs)
else:
return self.manager.get_all(**kwargs)
class TimerGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ['type', 'pack', 'name', 'description', 'parameters']
return result
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
self.print_output(
instances,
table.MultiColumnTable,
attributes=args.attr,
widths=args.width,
json=args.json,
yaml=args.yaml,
)
class RoleGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ROLE_ATTRIBUTE_DISPLAY_ORDER
pk_argument_name = 'id'
class RoleAssignmentBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(RoleAssignmentBranch, self).__init__(
UserRoleAssignment,
description,
app,
subparsers,
parent_parser=parent_parser,
read_only=True,
commands={'list': RoleAssignmentListCommand, 'get': RoleAssignmentGetCommand},
)
# handling for filters in the get method which reuqires this odd hack.
if args.show_secrets:
params = filters.get('params', {})
params['show_secrets'] = True
filters['params'] = params
return self.manager.get_all(**filters)
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
attr = self.detail_display_attributes if args.detail else args.attr
self.print_output(instances, table.MultiColumnTable,
attributes=attr, widths=args.width,
json=args.json, yaml=args.yaml)
class ApiKeyGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ['id', 'user', 'metadata']
pk_argument_name = 'key_or_id' # name of the attribute which stores resource PK
class ApiKeyCreateCommand(resource.ResourceCommand):
def __init__(self, resource, *args, **kwargs):
super(ApiKeyCreateCommand, self).__init__(
resource, 'create', 'Create a new %s.' % resource.get_display_name().lower(),
*args, **kwargs)
self.parser.add_argument('-u', '--user', type=str,
help='User for which to create API Keys.',
default='')
def run_and_print(self, args, **kwargs):
instances, count = self.run(args, **kwargs)
self.print_output(
reversed(instances),
table.MultiColumnTable,
attributes=args.attr,
widths=args.width,
json=args.json,
yaml=args.yaml,
)
if args.last and count and count > args.last:
table.SingleRowTable.note_box(self.resource_name, args.last)
class InquiryGetCommand(resource.ResourceGetCommand):
pk_argument_name = 'id'
display_attributes = ['id', 'roles', 'users', 'route', 'ttl', 'schema']
def __init__(self, kv_resource, *args, **kwargs):
super(InquiryGetCommand, self).__init__(kv_resource, *args, **kwargs)
@resource.add_auth_token_to_kwargs_from_cli
def run(self, args, **kwargs):
resource_name = getattr(args, self.pk_argument_name, None)
return self.get_resource_by_id(id=resource_name, **kwargs)
class InquiryRespondCommand(resource.ResourceCommand):
display_attributes = ['id', 'response']
def __init__(self, resource, *args, **kwargs):
from st2client.utils.date import format_isodate_for_user_timezone
class RuleEnforcementBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(RuleEnforcementBranch, self).__init__(
models.RuleEnforcement,
description,
app,
subparsers,
parent_parser=parent_parser,
commands={'list': RuleEnforcementListCommand, 'get': RuleEnforcementGetCommand},
)
class RuleEnforcementGetCommand(resource.ResourceGetCommand):
display_attributes = [
'id',
'rule.ref',
'trigger_instance_id',
'execution_id',
'failure_reason',
'enforced_at',
]
attribute_display_order = [
'id',
'rule.ref',
'trigger_instance_id',
'execution_id',
'failure_reason',
'enforced_at',
]
if args.system:
result = self.manager.query(**kwargs)
else:
result = self.manager.get_all(**kwargs)
return result
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
self.print_output(instances, table.MultiColumnTable,
attributes=args.attr, widths=args.width,
json=args.json, yaml=args.yaml)
class RoleGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ROLE_ATTRIBUTE_DISPLAY_ORDER
pk_argument_name = 'id'
class RoleAssignmentBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(RoleAssignmentBranch, self).__init__(
UserRoleAssignment, description, app, subparsers,
parent_parser=parent_parser,
read_only=True,
commands={
'list': RoleAssignmentListCommand,
'get': RoleAssignmentGetCommand
})
return self.manager.get_all(**filters)
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
attr = self.detail_display_attributes if args.detail else args.attr
self.print_output(
instances,
table.MultiColumnTable,
attributes=attr,
widths=args.width,
json=args.json,
yaml=args.yaml,
)
class ApiKeyGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ['id', 'user', 'metadata']
pk_argument_name = 'key_or_id' # name of the attribute which stores resource PK
class ApiKeyCreateCommand(resource.ResourceCommand):
def __init__(self, resource, *args, **kwargs):
super(ApiKeyCreateCommand, self).__init__(
resource,
'create',
'Create a new %s.' % resource.get_display_name().lower(),
*args,
**kwargs
)
dest='resource_type',
help='Return policy types for the resource type.',
)
@resource.add_auth_token_to_kwargs_from_cli
def run(self, args, **kwargs):
if args.resource_type:
filters = {'resource_type': args.resource_type}
filters.update(**kwargs)
instances = self.manager.query(**filters)
return instances
else:
return self.manager.get_all(**kwargs)
class PolicyTypeGetCommand(resource.ResourceGetCommand):
pk_argument_name = 'ref_or_id'
def get_resource(self, ref_or_id, **kwargs):
return self.get_resource_by_ref_or_id(ref_or_id=ref_or_id, **kwargs)
class PolicyBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(PolicyBranch, self).__init__(
models.Policy,
description,
app,
subparsers,
parent_parser=parent_parser,
commands={
'list': PolicyListCommand,