Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# encoding: utf-8
#
# This file is part of ckanext-nhm
# Created by the Natural History Museum in London, UK
import logging
from ckanext.datastore.db import _get_engine
import ckan.model as model
from ckan.plugins import toolkit
log = logging.getLogger()
class DatastoreCommand(toolkit.CkanCommand):
'''Datastore commands, for modifying CKAN datastore resources
Commands:
purge-all - Delete all datasets and datastore tables
paster datastore purge-all -c /etc/ckan/default/development.ini
replace - Update datastore aliases
paster datastore replace -i [resource_id] -t [table] -c /etc/ckan/default/development.ini
Required param: ID of the datastore resource to update
Required param: str alias
'''
summary = __doc__.split(u'\n')[0]
usage = __doc__
def render_record(self, c):
"""
Render a record
"""
# The record_dict does not have fields in the correct order
# So load the fields, and create an OrderedDict with field: value
c.field_data = OrderedDict()
for field in self.get_ordered_fields(c.resource['id']):
if not field.startswith('_'):
c.field_data[field] = c.record_dict.get(field, None)
return p.toolkit.render('record/view.html')
def issue_exists(issue_id, context):
issue_id = is_positive_integer(issue_id, context)
result = issuemodel.Issue.get(issue_id, session=context['session'])
if not result:
raise toolkit.Invalid(toolkit._('Issue not found') + ': %s' % issue_id)
return issue_id
def issue_search(context, data_dict):
try:
p.toolkit.check_access('package_search', context, dict(data_dict))
return {'success': True}
except p.toolkit.NotAuthorized:
return {
'success': False,
'msg': p.toolkit._(
'User {0} not authorized for action'.format(
str(context['user'])
)
def _authorize_or_abort(self, context):
try:
toolkit.check_access('package_create', context)
except toolkit.NotAuthorized:
toolkit.abort(401, toolkit._('Unauthorized to create a dataset'))
def openness_for_organization(organization=None, include_sub_organizations=False):
org = model.Group.get(organization)
if not org:
raise p.toolkit.ObjectNotFound
if not include_sub_organizations:
orgs = [org]
else:
orgs = lib.go_down_tree(org)
context = {'model': model, 'session': model.Session, 'ignore_auth': True}
score_counts = Counter()
rows = []
num_packages = 0
for org in orgs:
# NB org.packages() misses out many - see:
# http://redmine.dguteam.org.uk/issues/1844
pkgs = model.Session.query(model.Package) \
.filter_by(owner_org=org.id) \
.filter_by(state='active') \
def get_query_params():
'''Helper function to build a dict of query params
To be used in urls for persistent filters
:returns: dict
'''
params = dict()
for key in [u'q', u'filters']:
value = toolkit.request.params.get(key)
if value:
params[key] = value
return params
def update_config(self, config):
p.toolkit.add_template_directory(config, 'templates')
def reported_comments(self, organization_id):
try:
organization = toolkit.get_action('organization_show')(data_dict={
'id': organization_id,
})
comments = toolkit.get_action('issue_comment_search')(data_dict={
'organization_id': organization['id'],
'only_hidden': True,
})
return toolkit.render(
'issues/comment_moderation.html',
extra_vars={
'comments': comments,
'organization': organization,
}
)
except toolkit.ObjectNotFound:
toolkit.abort(404, toolkit._('Organization not found'))