Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@auth_required(READ)
def GET(self, type_id):
"""
List all the available content units.
"""
cqm = factory.content_query_manager()
units = cqm.find_by_criteria(type_id, Criteria())
return self.ok([self.process_unit(unit) for unit in units])
@auth_required(READ)
def GET(self, type_id):
"""
Does a normal GET after setting the query method from the appropriate
PulpCollection.
Include query parameter "repos" with any value that evaluates to True to
get the attribute "repository_memberships" added to each unit.
@param type_id: id of a ContentType that we are searching.
@type type_id: basestring
"""
self._type_id = type_id
raw_units = self._get_query_results_from_get(ignore_fields=('include_repos',))
units = [ContentUnitsCollection.process_unit(unit) for unit in raw_units]
if web.input().get('include_repos'):
self._add_repo_memberships(units, type_id)
@auth_required(authorization.UPDATE)
def PUT(self, repo_group_id):
update_data = self.params()
manager = managers_factory.repo_group_manager()
group = manager.update_repo_group(repo_group_id, **update_data)
group.update(serialization.link.current_link_obj())
return self.ok(group)
@auth_required(UPDATE)
def POST(self, role_id):
# Params (validation will occur in the manager)
params = self.params()
login = params.get('login', None)
if login is None:
raise exceptions.InvalidValue(login)
role_manager = managers.role_manager()
tags = [resource_tag(dispatch_constants.RESOURCE_ROLE_TYPE, role_id),
action_tag('add_user_to_role')]
call_request = CallRequest(role_manager.add_user_to_role,
[role_id, login],
tags=tags)
call_request.updates_resource(dispatch_constants.RESOURCE_USER_TYPE, login)
@error_handler
@auth_required(READ)
def GET(self, id):
'''
Returns a list of tasks associated with the CDS identified by id.
'''
# Find all sync tasks associated with the given CDS
tasks = [t for t in async.find_async(method_name='cds_sync')
if id in t.args]
if len(tasks) == 0:
return self.not_found('No sync tasks found for CDS [%s]' % id)
all_task_infos = []
for task in tasks:
info = self._task_to_dict(task)
all_task_infos.append(info)
@error_handler
@auth_required(UPDATE)
def POST(self, id):
"""
[[wiki]]
title: Cancel A Task
description: Cancel a waiting or running task.
method: POST
path: /tasks//cancel/
permission: UPDATE
success response: 202 Accepted
failure response: 404 Not Found
return: Task object
"""
tasks = async.find_async(id=id)
if not tasks:
return self.not_found(_('Task not found: %s') % id)
task = tasks[0]
@auth_required(DELETE)
def DELETE(self):
"""
@return: True on successful deletion of all filters
"""
api.clean()
return self.ok(True)
@auth_required(READ)
def GET(self, name, version, release, epoch, arch):
pv = api.package_by_ivera(name, version, epoch, release, arch)
return self.ok(pv)
@auth_required(CREATE)
def POST(self):
# Parameters
params = self.params()
notifier_type_id = params.get('notifier_type_id', None)
notifier_config = params.get('notifier_config', None)
event_types = params.get('event_types', None)
# Execution
manager = manager_factory.event_listener_manager()
created = manager.create(notifier_type_id, notifier_config, event_types)
href = link.child_link_obj(created['id'])
created.update(href)
return self.created(href['_href'], created)
@auth_required(READ)
def GET(self, role_id):
role = managers.role_query_manager().find_by_id(role_id)
if role is None:
raise exceptions.MissingResource(role_id)
role['users'] = [u['login'] for u in
managers.user_query_manager().find_users_belonging_to_role(role['id'])]
permissions_manager = managers.permission_manager()
# isolate schema change
resource_permission = {}
for item in role['permissions']:
resource = item['resource']
operations = item.get('permission', [])
resource_permission[resource] = [permissions_manager.operation_value_to_name(o)