Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
auth_field, request_auth_value = auth_field_and_value(resource)
if auth_field and original.get(auth_field) != request_auth_value:
abort(403)
last_modified = None
etag = None
issues = {}
object_id = original[resource_def["id_field"]]
response = {}
if config.BANDWIDTH_SAVER is True:
embedded_fields = []
else:
req = parse_request(resource)
embedded_fields = resolve_embedded_fields(resource, req)
try:
document = parse(payload, resource)
resolve_sub_resource_path(document, resource)
if skip_validation:
validation = True
else:
validation = validator.validate_replace(document, object_id, original)
# Apply coerced values
document = validator.document
if validation:
# sneak in a shadow copy if it wasn't already there
late_versioning_catch(original, resource)
# update meta
if skip_validation
else app.validator(
schema, resource=resource, allow_unknown=resource_def["allow_unknown"]
)
)
documents = []
results = []
failures = 0
id_field = resource_def["id_field"]
if config.BANDWIDTH_SAVER is True:
embedded_fields = []
else:
req = parse_request(resource)
embedded_fields = resolve_embedded_fields(resource, req)
# validation, and additional fields
if payl is None:
payl = payload()
if isinstance(payl, dict):
payl = [payl]
if not payl:
# empty bulk insert
abort(400, description=debug_error_message("Empty bulk insert"))
if len(payl) > 1 and not config.DOMAIN[resource]["bulk_enabled"]:
abort(400, description=debug_error_message("Bulk insert not allowed"))
for value in payl:
def _resolve_embedded_item(self, doc, req):
"""Resolve embedded fields
:param dict doc: document to resolved embedded fields
:param req: request object
:return:
"""
if not req:
return
if doc.get('type') == 'event':
# get the embedded fields from events resources
fields = resolve_embedded_fields(EventsResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), EventsResource.endpoint_name, fields)
elif doc.get('type') == 'planning':
fields = resolve_embedded_fields(PlanningResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningResource.endpoint_name, fields)
elif doc.get('type') == 'planning_featured':
fields = resolve_embedded_fields(PlanningFeaturedResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningFeaturedResource.endpoint_name, fields)
def _resolve_embedded_item(self, doc, req):
"""Resolve embedded fields
:param dict doc: document to resolved embedded fields
:param req: request object
:return:
"""
if not req:
return
if doc.get('type') == 'event':
# get the embedded fields from events resources
fields = resolve_embedded_fields(EventsResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), EventsResource.endpoint_name, fields)
elif doc.get('type') == 'planning':
fields = resolve_embedded_fields(PlanningResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningResource.endpoint_name, fields)
elif doc.get('type') == 'planning_featured':
fields = resolve_embedded_fields(PlanningFeaturedResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningFeaturedResource.endpoint_name, fields)
:param dict doc: document to resolved embedded fields
:param req: request object
:return:
"""
if not req:
return
if doc.get('type') == 'event':
# get the embedded fields from events resources
fields = resolve_embedded_fields(EventsResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), EventsResource.endpoint_name, fields)
elif doc.get('type') == 'planning':
fields = resolve_embedded_fields(PlanningResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningResource.endpoint_name, fields)
elif doc.get('type') == 'planning_featured':
fields = resolve_embedded_fields(PlanningFeaturedResource.endpoint_name, req) or []
resolve_embedded_documents(doc.get('published_item'), PlanningFeaturedResource.endpoint_name, fields)
def _perform_find(resource, lookup):
"""
.. versionadded:: 0.7
"""
documents = []
response = {}
etag = None
req = parse_request(resource)
embedded_fields = resolve_embedded_fields(resource, req)
# continue processing the full request
last_update = epoch()
# If-Modified-Since disabled on collections (#334)
req.if_modified_since = None
cursor, count = app.data.find(
resource, req, lookup, perform_count=not config.OPTIMIZE_PAGINATION_FOR_SPEED
)
# If soft delete is enabled, data.find will not include items marked
# deleted unless req.show_deleted is True
for document in cursor:
build_response_document(document, resource, embedded_fields)
documents.append(document)
.. versionchanged:: 0.0.5
Support for user-restricted access to resources.
Support for LAST_UPDATED field missing from documents, because they were
created outside the API context.
.. versionchanged:: 0.0.4
Added the ``requires_auth`` decorator.
.. versionchanged:: 0.0.3
Superflous ``response`` container removed. Links wrapped with
``_links``. Links are now properly JSON formatted.
"""
req = parse_request(resource)
resource_def = config.DOMAIN[resource]
embedded_fields = resolve_embedded_fields(resource, req)
soft_delete_enabled = config.DOMAIN[resource]["soft_delete"]
if soft_delete_enabled:
# GET requests should always fetch soft deleted documents from the db
# They are handled and included in 404 responses below.
req.show_deleted = True
document = app.data.find_one(resource, req, **lookup)
if not document:
abort(404)
response = {}
etag = None
version = request.args.get(config.VERSION_PARAM)
latest_doc = None
cursor = None
validator = app.validator(
schema, resource=resource, allow_unknown=resource_def["allow_unknown"]
)
object_id = original[resource_def["id_field"]]
last_modified = None
etag = None
issues = {}
response = {}
if config.BANDWIDTH_SAVER is True:
embedded_fields = []
else:
req = parse_request(resource)
embedded_fields = resolve_embedded_fields(resource, req)
try:
updates = parse(payload, resource)
if skip_validation:
validation = True
else:
validation = validator.validate_update(
updates, object_id, original, normalize_document
)
updates = validator.document
if validation:
# Apply coerced values
# sneak in a shadow copy if it wasn't already there
late_versioning_catch(original, resource)