Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup_class(self):
# Return a test app with the custom config.
app = ckan.config.middleware.make_app(config['global_conf'], **config)
self.app = webtest.TestApp(app)
ckan.plugins.load('issues')
ckan.tests.CreateTestData.create()
self.sysadmin_user = ckan.model.User.get('testsysadmin')
self.dataset = model.Package.get('annakarenina')
self.extra_environ_tester = {'REMOTE_USER': 'tester'}
self.context = {
'model': model,
'auth_user_obj': self.sysadmin_user,
'user': self.sysadmin_user.name
}
self.issue = logic.get_action('issue_create')(self.context, {
'title': 'General test issue',
'description': 'Abc\n\n## Section',
'dataset_id': self.dataset.id
})
self.comment = logic.get_action('issue_comment_create')(self.context, {
'issue_id': self.issue['id'],
'comment': 'Test comment'
})
def create_package(self, context):
"""
Setup the CKAN datastore
Return the package
@param context:
@return: package
"""
# Merge package params with default params
params = dict(chain(self.default_package_params.iteritems(), self.package.iteritems()))
try:
# Try and load the KE EMu package
package = logic.get_action('package_show')(context, {'id': params['name']})
except logic.NotFound:
# KE EMu package not found; create it
# Error adds dataset_show to __auth_audit: remove it
context['__auth_audit'] = []
package = logic.get_action('package_create')(context, params)
return package
c.search_facets = query['search_facets']
# Add the featured related applications to the template context.
data_dict = {
'type_filter': 'application',
'featured': True,
}
c.feautured_related_apps = logic.get_action('related_list')(context,
data_dict)
# Add the featured related ideas to the template context.
data_dict = {
'type_filter': 'idea',
'featured': True,
}
c.feautured_related_ideas = logic.get_action('related_list')(context,
data_dict)
return render('home/index.html')
from ckan.logic import validate
import ckan.lib.helpers as h
import ckanext.issues.model as issuemodel
from ckanext.issues.logic import schema
from ckanext.issues.exception import ReportAlreadyExists
from ckanext.issues.lib.helpers import get_issue_subject, get_site_title
try:
import ckan.authz as authz
except ImportError:
import ckan.new_authz as authz
from pylons import config
from sqlalchemy.exc import IntegrityError
from sqlalchemy import desc
_get_or_bust = logic.get_or_bust
log = logging.getLogger(__name__)
def _add_reports(obj, can_edit, current_user):
reports = [r.user_id for r in obj.abuse_reports]
if can_edit:
return reports
else:
user_obj = model.User.get(current_user)
if user_obj and user_obj.id in reports:
return [user_obj.id]
else:
return []
def _update_resource(context, resource):
"""
Use CKAN API to update the given resource.
Returns the content of the response.
"""
resource['last_modified'] = datetime.now().isoformat()
try:
logic.get_action('resource_update')(context, resource)
except Exception as e:
log.exception(e)
raise CkanError('ckan failed to update resource')
def create_datastore_resource(self):
"""
Create the CKAN datastore resource
@return: resource_id
"""
user = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})
context = {'model': model, 'session': model.Session, 'user': user['name'], 'extras_as_string': True}
# Create a package for the new datastore
package = self.create_package(context)
print 'Creating datastore %s' % self.name
# Get all existing resources keyed by name
existing_resources = {r['name']: r for r in package['resources']}
try:
resource_id = existing_resources[self.name]['id']
except KeyError: # We don't have a datastore resource so create one
import ckan.lib.authenticator as authenticator
import ckan.lib.base as base
import ckan.lib.csrf_token as csrf_token
import ckan.lib.mailer as mailer
abort = base.abort
render = base.render
check_access = ckan.logic.check_access
NotAuthorized = ckan.logic.NotAuthorized
NotFound = ckan.logic.NotFound
get_action = ckan.logic.get_action
unflatten = dictization_functions.unflatten
DataError = dictization_functions.DataError
UsernamePasswordError = logic.UsernamePasswordError
ValidationError = logic.ValidationError
import logging
log = logging.getLogger(__name__)
import auth
def admin_only(context, data_dict=None):
return {'success': False, 'msg': 'Access restricted to system administrators'}
def set_repoze_user(user_id):
'''Set the repoze.who cookie to match a given user_id'''
if 'repoze.who.plugins' in request.environ:
rememberer = request.environ['repoze.who.plugins']['friendlyform']
import ckan.lib.base as base
import ckan.model as model
import ckan.plugins as p
from ckan.common import _, c
import logging
from ckanext.nhm.lib.dwc import DwC
from ckanext.nhm.controllers.record import RecordController
log = logging.getLogger(__name__)
render = base.render
abort = base.abort
redirect = base.redirect
NotFound = logic.NotFound
NotAuthorized = logic.NotAuthorized
ValidationError = logic.ValidationError
get_action = logic.get_action
class DarwinCoreController(RecordController):
"""
Controller for displaying KE EMu records as DwC
"""
def view(self, package_name, resource_id, record_id):
"""
View an individual record
:param id:
:param resource_id:
:param record_id:
:return: html
import ckanext.stats.stats as stats_lib
from datetime import datetime, timedelta, date
from ckan.model import TrackingSummary, Resource
from sqlalchemy import and_
from pylons import config
from sqlalchemy import func
from dateutil import rrule
render = base.render
abort = base.abort
redirect = base.redirect
NotFound = logic.NotFound
NotAuthorized = logic.NotAuthorized
get_action = logic.get_action
check_access = logic.check_access
log = logging.getLogger(__name__)
class StatsController(p.toolkit.BaseController):
"""
Controller for displaying stats pages
"""
def __before__(self, action, **env):
base.BaseController.__before__(self, action, **env)
try:
self.context = {'model': model, 'user': c.user or c.author, 'auth_user_obj': c.userobj}
# check_access('site_read', self.context)
import logging
import re
import json
import ckan.model as model
import ckan.logic as logic
import ckan.plugins as p
from copy import deepcopy
from ckan.plugins import toolkit as tk
from pylons import config
from collections import OrderedDict
from ckanext.nhm.views.default import DefaultView
from ckanext.nhm.views.dwc import DarwinCoreView
ValidationError = logic.ValidationError
log = logging.getLogger(__name__)
get_action = logic.get_action
class SpecimenView(DefaultView):
"""
Controller for displaying a specimen record
"""
resource_id = config.get("ckanext.nhm.specimen_resource_id")
grid_default_columns = DarwinCoreView.grid_default_columns
grid_column_widths = DarwinCoreView.grid_column_widths
field_facets = [