Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
super(InquiryBranch, self).__init__(
Inquiry,
description,
app,
subparsers,
parent_parser=parent_parser,
read_only=True,
commands={'list': InquiryListCommand, 'get': InquiryGetCommand},
)
# Register extended commands
self.commands['respond'] = InquiryRespondCommand(self.resource, self.app, self.subparsers)
class InquiryListCommand(resource.ResourceCommand):
# Omitting "schema" and "response", as it doesn't really show up in a table well.
# The user can drill into a specific Inquiry to get this
display_attributes = ['id', 'roles', 'users', 'route', 'ttl']
def __init__(self, resource, *args, **kwargs):
self.default_limit = 50
super(InquiryListCommand, self).__init__(
resource,
'list',
'Get the list of the %s most recent %s.'
% (self.default_limit, resource.get_plural_display_name().lower()),
*args,
**kwargs
pk_argument_name = 'id'
class RoleAssignmentBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(RoleAssignmentBranch, self).__init__(
UserRoleAssignment, description, app, subparsers,
parent_parser=parent_parser,
read_only=True,
commands={
'list': RoleAssignmentListCommand,
'get': RoleAssignmentGetCommand
})
class RoleAssignmentListCommand(resource.ResourceCommand):
display_attributes = ['id', 'role', 'user', 'is_remote', 'source', 'description']
attribute_display_order = ROLE_ASSIGNMENT_ATTRIBUTE_DISPLAY_ORDER
def __init__(self, resource, *args, **kwargs):
super(RoleAssignmentListCommand, self).__init__(
resource, 'list', 'Get the list of the %s.' %
resource.get_plural_display_name().lower(),
*args, **kwargs)
# Filter options
self.parser.add_argument('-r', '--role', help='Role to filter on.')
self.parser.add_argument('-u', '--user', help='User to filter on.')
self.parser.add_argument('--remote', action='store_true',
help='Only display remote role assignments.')
# Display options
'rule.ref',
'trigger_instance_id',
'execution_id',
'failure_reason',
'enforced_at',
]
pk_argument_name = 'id'
@resource.add_auth_token_to_kwargs_from_cli
def run(self, args, **kwargs):
resource_id = getattr(args, self.pk_argument_name, None)
return self.get_resource_by_id(resource_id, **kwargs)
class RuleEnforcementListCommand(resource.ResourceCommand):
display_attributes = ['id', 'rule.ref', 'trigger_instance_id', 'execution_id', 'enforced_at']
attribute_display_order = [
'id',
'rule.ref',
'trigger_instance_id',
'execution_id',
'enforced_at',
]
attribute_transform_functions = {'enforced_at': format_isodate_for_user_timezone}
def __init__(self, resource, *args, **kwargs):
self.default_limit = 50
super(RuleEnforcementListCommand, self).__init__(
table.MultiColumnTable,
attributes=attr,
widths=args.width,
json=args.json,
yaml=args.yaml,
)
class ApiKeyGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ['id', 'user', 'metadata']
pk_argument_name = 'key_or_id' # name of the attribute which stores resource PK
class ApiKeyCreateCommand(resource.ResourceCommand):
def __init__(self, resource, *args, **kwargs):
super(ApiKeyCreateCommand, self).__init__(
resource,
'create',
'Create a new %s.' % resource.get_display_name().lower(),
*args,
**kwargs
)
self.parser.add_argument(
'-u', '--user', type=str, help='User for which to create API Keys.', default=''
)
self.parser.add_argument(
'-m',
'--metadata',
type=json.loads,
match, _ = self.manager.match(alias_match, **kwargs)
return [match]
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
self.print_output(
instances,
table.MultiColumnTable,
attributes=args.attr,
widths=args.width,
json=args.json,
yaml=args.yaml,
)
class ActionAliasExecuteCommand(resource.ResourceCommand):
display_attributes = ['name']
def __init__(self, resource, *args, **kwargs):
super(ActionAliasExecuteCommand, self).__init__(
resource,
'execute',
(
'Execute the command text by finding a matching %s.'
% resource.get_display_name().lower()
),
*args,
**kwargs
)
self.parser.add_argument(
'command_text',
ROLE_ASSIGNMENT_ATTRIBUTE_DISPLAY_ORDER = ['id', 'role', 'user', 'is_remote', 'description']
class RoleBranch(resource.ResourceBranch):
def __init__(self, description, app, subparsers, parent_parser=None):
super(RoleBranch, self).__init__(
Role, description, app, subparsers,
parent_parser=parent_parser,
read_only=True,
commands={
'list': RoleListCommand,
'get': RoleGetCommand
})
class RoleListCommand(resource.ResourceCommand):
display_attributes = ['id', 'name', 'system', 'description']
attribute_display_order = ROLE_ATTRIBUTE_DISPLAY_ORDER
def __init__(self, resource, *args, **kwargs):
super(RoleListCommand, self).__init__(
resource, 'list', 'Get the list of the %s.' %
resource.get_plural_display_name().lower(),
*args, **kwargs)
self.group = self.parser.add_mutually_exclusive_group()
# Filter options
self.group.add_argument('-s', '--system', action='store_true',
help='Only display system roles.')
# Display options
'name',
'description',
'parameters_schema',
'payload_schema',
]
class TriggerTypeUpdateCommand(resource.ContentPackResourceUpdateCommand):
pass
class TriggerTypeDeleteCommand(resource.ContentPackResourceDeleteCommand):
pass
class TriggerTypeSubTriggerCommand(resource.ResourceCommand):
attribute_display_order = [
'id',
'ref',
'context',
'parameters',
'status',
'start_timestamp',
'result',
]
def __init__(self, resource, *args, **kwargs):
super(TriggerTypeSubTriggerCommand, self).__init__(
resource,
kwargs.pop('name', 'getspecs'),
'Return Trigger Specifications of a Trigger.',
app,
subparsers,
parent_parser=parent_parser,
read_only=True,
commands={'list': PackListCommand, 'get': PackGetCommand},
)
self.commands['show'] = PackShowCommand(self.resource, self.app, self.subparsers)
self.commands['search'] = PackSearchCommand(self.resource, self.app, self.subparsers)
self.commands['install'] = PackInstallCommand(self.resource, self.app, self.subparsers)
self.commands['remove'] = PackRemoveCommand(self.resource, self.app, self.subparsers)
self.commands['register'] = PackRegisterCommand(self.resource, self.app, self.subparsers)
self.commands['config'] = PackConfigCommand(self.resource, self.app, self.subparsers)
class PackResourceCommand(resource.ResourceCommand):
def run_and_print(self, args, **kwargs):
try:
instance = self.run(args, **kwargs)
if not instance:
raise resource.ResourceNotFoundError("No matching items found")
self.print_output(
instance,
table.PropertyValueTable,
attributes=['all'],
json=args.json,
yaml=args.yaml,
)
except resource.ResourceNotFoundError:
print("No matching items found")
except Exception as e:
message = six.text_type(e)
def run_and_print(self, args, **kwargs):
instances = self.run(args, **kwargs)
attr = self.detail_display_attributes if args.detail else args.attr
self.print_output(instances, table.MultiColumnTable,
attributes=attr, widths=args.width,
json=args.json, yaml=args.yaml)
class ApiKeyGetCommand(resource.ResourceGetCommand):
display_attributes = ['all']
attribute_display_order = ['id', 'user', 'metadata']
pk_argument_name = 'key_or_id' # name of the attribute which stores resource PK
class ApiKeyCreateCommand(resource.ResourceCommand):
def __init__(self, resource, *args, **kwargs):
super(ApiKeyCreateCommand, self).__init__(
resource, 'create', 'Create a new %s.' % resource.get_display_name().lower(),
*args, **kwargs)
self.parser.add_argument('-u', '--user', type=str,
help='User for which to create API Keys.',
default='')
self.parser.add_argument('-m', '--metadata', type=json.loads,
help='Optional metadata to associate with the API Keys.',
default={})
self.parser.add_argument('-k', '--only-key', action='store_true', dest='only_key',
default=False,
help='Only print API Key to the console on creation.')
def run_and_print(self, args, **kwargs):
instance = self.run(args, **kwargs)
if args.only_token:
print(instance.token)
else:
self.print_output(
instance,
table.PropertyValueTable,
attributes=self.display_attributes,
json=args.json,
yaml=args.yaml,
)
class LoginCommand(resource.ResourceCommand):
display_attributes = ['user', 'token', 'expiry']
def __init__(self, resource, *args, **kwargs):
kwargs['has_token_opt'] = False
super(LoginCommand, self).__init__(
resource,
kwargs.pop('name', 'create'),
'Authenticate user, acquire access token, and update CLI config directory',
*args,
**kwargs
)
self.parser.add_argument('username', help='Name of the user to authenticate.')