Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
resourceId = params.get('resourceId', None)
resourceName = params.get('resourceName', None)
startTime = time.time()
with ProgressContext(True, user=self.getCurrentUser(),
title='Progress Test', message='Progress Message',
total=duration, resource={'_id': resourceId},
resourceName=resourceName) as ctx:
for current in range(duration):
if self.stop:
break
ctx.update(current=current)
wait = startTime + current + 1 - time.time()
if wait > 0:
time.sleep(wait)
if test == 'error':
raise RestException('Progress error test.')
def _getAnnotation(self, user, id, params):
"""
Get a generator function that will yield the json of an annotation.
:param user: the user that needs read access on the annoation and its
parent item.
:param id: the annotation id.
:param params: paging and region parameters for the annotation.
:returns: a function that will return a generator.
"""
# Set the response time limit to a very long value
setResponseTimeLimit(86400)
annotation = Annotation().load(
id, region=params, user=user, level=AccessType.READ, getElements=False)
if annotation is None:
raise RestException('Annotation not found', 404)
# Ensure that we have read access to the parent item. We could fail
# faster when there are permissions issues if we didn't load the
# annotation elements before checking the item access permissions.
# This had been done via the filtermodel decorator, but that doesn't
# work with yielding the elements one at a time.
annotation = Annotation().filter(annotation, self.getCurrentUser())
annotation['annotation']['elements'] = []
breakStr = b'"elements": ['
base = json.dumps(annotation, sort_keys=True, allow_nan=False,
cls=JsonEncoder).encode('utf8').split(breakStr)
centroids = str(params.get('centroids')).lower() == 'true'
def generateResult():
info = {}
idx = 0
def createItemAnnotations(self, item, annotations):
user = self.getCurrentUser()
if not isinstance(annotations, list):
annotations = [annotations]
for entry in annotations:
if not isinstance(entry, dict):
raise RestException('Entries in the annotation list must be JSON objects.')
annotation = entry.get('annotation', entry)
try:
Annotation().createAnnotation(item, user, annotation)
except ValidationException as exc:
logger.exception('Failed to validate annotation')
raise RestException(
"Validation Error: JSON doesn't follow schema (%r)." % (
exc.args, ))
return len(annotations)
:param allowConstants: Whether the keywords Infinity, -Infinity, and NaN
should be allowed. These keywords are valid JavaScript and will parse
to the correct float values, but are not valid in strict JSON.
:type allowConstants: bool
"""
if allowConstants:
_parseConstants = None
else:
def _parseConstants(val):
raise RestException('Error: "%s" is not valid JSON.' % val)
text = cherrypy.request.body.read().decode('utf8')
try:
return json.loads(text, parse_constant=_parseConstants)
except ValueError:
raise RestException('Invalid JSON passed in request body.')
:param params: The URL query parameters.
:type params: dict
:param defaultSortField: If the client did not pass a 'sort' parameter,
set this to choose a default sort field. If None, the results will
be returned unsorted.
:type defaultSortField: str or None
:param defaultSortDir: Sort direction.
:type defaultSortDir: girder.constants.SortDir
"""
try:
offset = int(params.get('offset', 0))
limit = int(params.get('limit', 50))
sortdir = int(params.get('sortdir', defaultSortDir))
except ValueError:
raise RestException('Invalid value for offset, limit, or sortdir parameter.')
if sortdir not in [SortDir.ASCENDING, SortDir.DESCENDING]:
raise RestException('Invalid value for sortdir parameter.')
if 'sort' in params:
sort = [(params['sort'].strip(), sortdir)]
elif isinstance(defaultSortField, six.string_types):
sort = [(defaultSortField, sortdir)]
else:
sort = None
return limit, offset, sort
def _validateJsonType(self, name, info, val):
if info.get('schema') is not None:
try:
jsonschema.validate(val, info['schema'])
except jsonschema.ValidationError as e:
raise RestException('Invalid JSON object for parameter %s: %s' % (
name, str(e)))
elif info['requireObject'] and not isinstance(val, dict):
raise RestException('Parameter %s must be a JSON object.' % name)
elif info['requireArray'] and not isinstance(val, list):
raise RestException('Parameter %s must be a JSON array.' % name)
def _validate_preferredAssetstore(self, value):
"""Validate the preferredAssetstore parameter.
:param value: the proposed value.
:returns: the validated value: either None or 'current' to use the
current assetstore or an assetstore ID.
"""
if not value or value == 'current':
return None
try:
value = ObjectId(value)
except InvalidId:
raise RestException(
'Invalid preferredAssetstore. Must either be an assetstore '
'ID, or be blank or "current" to use the current assetstore.',
extra='preferredAssetstore')
return value
:raises: `RestException`, when no route can be matched.
"""
if not self._routes:
raise GirderException('No routes defined for resource')
for route, handler in self._routes[method][len(path)]:
wildcards = {}
for routeComponent, pathComponent in six.moves.zip(route, path):
if routeComponent[0] == ':': # Wildcard token
wildcards[routeComponent[1:]] = pathComponent
elif routeComponent != pathComponent: # Exact match token
break
else:
return route, handler, wildcards
raise RestException('No matching route for "%s %s"' % (method.upper(), '/'.join(path)))
required, you can simply pass it as a string.
:type required: `list, tuple, or str`
:param provided: The list of provided parameters.
:type provided: dict
"""
if provided is None and isinstance(required, dict):
for name, val in six.viewitems(required):
if val is None:
raise RestException('Parameter "%s" is required.' % name)
else:
if isinstance(required, six.string_types):
required = (required,)
for param in required:
if provided is None or param not in provided:
raise RestException('Parameter "%s" is required.' % param)
def _validateCsrfToken(self, state):
"""
Tests the CSRF token value in the cookie to authenticate the user as
the originator of the OAuth2 login. Raises a RestException if the token
is invalid.
"""
csrfTokenId, _, redirect = state.partition('.')
token = Token().load(csrfTokenId, objectId=False, level=AccessType.READ)
if token is None:
raise RestException('Invalid CSRF token (state="%s").' % state, code=403)
Token().remove(token)
if token['expires'] < datetime.datetime.utcnow():
raise RestException('Expired CSRF token (state="%s").' % state,
code=403)
if not redirect:
raise RestException('No redirect location (state="%s").' % state)
return redirect