Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param action: the action name, e.g. 'package_create'
:param data_dict: the dict to pass to the action as JSON,
defaults to {}
:param context: not supported
:param files: not supported
This function parses the response from the server as JSON and
returns the decoded value. When an error is returned this
function will convert it back to an exception that matches the
one the action function itself raised.
"""
if context:
raise CKANAPIError("TestAppCKAN.call_action does not support "
"use of context parameter, use apikey instead")
if files:
raise CKANAPIError("TestAppCKAN.call_action does not support "
"file uploads, consider contributing it if you need it")
url, data, headers = prepare_action(action, data_dict,
apikey or self.apikey)
r = self.test_app.post('/' + url, data, headers, expect_errors=True)
return reverse_apicontroller_action(url, r.status, r.body)
"""
:param action: the action name, e.g. 'package_create'
:param data_dict: the dict to pass to the action as JSON,
defaults to {}
:param context: always set to None for RemoteCKAN
:param apikey: API key for authentication
:param files: None or {field-name: file-to-be-sent, ...}
:param requests_kwargs: kwargs for requests get/post calls
This function parses the response from the server as JSON and
returns the decoded value. When an error is returned this
function will convert it back to an exception that matches the
one the action function itself raised.
"""
if context:
raise CKANAPIError("RemoteCKAN.call_action does not support "
"use of context parameter, use apikey instead")
if files and self.get_only:
raise CKANAPIError("RemoteCKAN: files may not be sent when "
"get_only is True")
url, data, headers = prepare_action(
action, data_dict, apikey or self.apikey, files)
headers['User-Agent'] = self.user_agent
url = self.address.rstrip('/') + '/' + url
requests_kwargs = requests_kwargs or {}
if not self.session:
self.session = requests.Session()
if self.get_only:
status, response = self._request_fn_get(url, data_dict, headers, requests_kwargs)
else:
status, response = self._request_fn(url, data, headers, files, requests_kwargs)
return reverse_apicontroller_action(url, status, response)
def __str__(self):
return repr(self.error_dict)
class NotFound(CKANAPIError):
def __init__(self, extra_msg=None):
self.extra_msg = extra_msg
def __str__(self):
return self.extra_msg
class SearchQueryError(CKANAPIError):
pass
class SearchError(CKANAPIError):
pass
class SearchIndexError(CKANAPIError):
pass
else:
# import ckan worked, so these must not fail
from ckan.logic import (NotAuthorized, NotFound, ValidationError)
from ckan.lib.search import (SearchQueryError, SearchError,
SearchIndexError)
def populate_datastore_fields(ckan, dataset):
"""
update dataset dict in-place with datastore_fields values
in every resource with datastore active using ckan
LocalCKAN/RemoteCKAN instance
"""
for res in dataset.get('resources', []):
if not res.get('datastore_active', False):
continue
try:
ds = ckan.call_action('datastore_search', {
'resource_id': res['id'],
'limit':0})
res['datastore_fields'] = ds['fields']
except CKANAPIError:
pass
class CLIError(Exception):
pass
try:
import ckan
except ImportError:
# Implement the minimum to be compatible with existing errors
# without requiring CKAN
class NotAuthorized(CKANAPIError):
pass
class ValidationError(CKANAPIError):
def __init__(self, error_dict):
self.error_dict = error_dict
def __str__(self):
return repr(self.error_dict)
class NotFound(CKANAPIError):
def __init__(self, extra_msg=None):
self.extra_msg = extra_msg
def __str__(self):
return self.extra_msg
class SearchQueryError(CKANAPIError):
pass
class SearchError(CKANAPIError):
pass
class ValidationError(CKANAPIError):
def __init__(self, error_dict):
self.error_dict = error_dict
def __str__(self):
return repr(self.error_dict)
class NotFound(CKANAPIError):
def __init__(self, extra_msg=None):
self.extra_msg = extra_msg
def __str__(self):
return self.extra_msg
class SearchQueryError(CKANAPIError):
pass
class SearchError(CKANAPIError):
pass
class SearchIndexError(CKANAPIError):
pass
else:
# import ckan worked, so these must not fail
from ckan.logic import (NotAuthorized, NotFound, ValidationError)
from ckan.lib.search import (SearchQueryError, SearchError,
SearchIndexError)
class NotAuthorized(CKANAPIError):
pass
class ValidationError(CKANAPIError):
def __init__(self, error_dict):
self.error_dict = error_dict
def __str__(self):
return repr(self.error_dict)
class NotFound(CKANAPIError):
def __init__(self, extra_msg=None):
self.extra_msg = extra_msg
def __str__(self):
return self.extra_msg
class SearchQueryError(CKANAPIError):
pass
class SearchError(CKANAPIError):
pass
class SearchIndexError(CKANAPIError):
pass
else:
# import ckan worked, so these must not fail
from ckan.logic import (NotAuthorized, NotFound, ValidationError)
from ckan.lib.search import (SearchQueryError, SearchError,
SearchIndexError)