Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def assert_correct_etag(self, patch_response):
etag = patch_response.get_json()[config.ETAG]
get_resp = self.client.get(self.url).get_json()
get_etag = get_resp[config.ETAG]
self.assertEqual(etag, get_etag)
def test_post_with_pre_save_hook(self):
# resulting etag has to match (etag must be computed from
# modified data, not from original!)
data = {'a': 'hey'}
response = self.client.post('/hawkeydoc/', data='{"a": "hey"}',
content_type='application/json')
self.assertEqual(response.status_code, 201)
resp_json = response.get_json()
self.assertEqual(resp_json[config.STATUS], "OK")
etag = resp_json[config.ETAG]
# verify etag
resp = self.client.get('/hawkeydoc/%s' % resp_json['_id'])
self.assertEqual(etag, resp.get_json()[config.ETAG])
def test_etag_in_item_and_resource(self):
# etag of some entity has to be the same when fetching one item compared
# to etag of part of feed (resource)
d = ComplexDoc().save()
feed = self.client.get('/complexdoc/').get_json()
item = self.client.get('/complexdoc/%s' % d.id).get_json()
try:
self.assertEqual(feed[config.ITEMS][0][config.ETAG], item[config.ETAG])
finally:
d.delete()
def sqla_object_to_dict(obj, fields):
""" Creates a dict containing copies of the requested fields from the
SQLAlchemy query result """
if config.LAST_UPDATED not in fields:
fields.append(config.LAST_UPDATED)
if config.DATE_CREATED not in fields:
fields.append(config.DATE_CREATED)
if config.ETAG not in fields \
and getattr(config, 'IF_MATCH', True):
fields.append(config.ETAG)
result = {}
for field in map(lambda f: f.split('.', 1)[0], fields):
try:
val = obj.__getattribute__(field)
# If association proxies are embedded, their values must be copied
# since they are garbage collected when Eve try to encode the
# response.
if hasattr(val, 'copy'):
val = val.copy()
result[field] = _sanitize_value(val)
except AttributeError:
# Ignore if the requested field does not exist
# (may be wrong embedding parameter)
def sqla_object_to_dict(obj, fields):
""" Creates a dict containing copies of the requested fields from the
SQLAlchemy query result """
if config.LAST_UPDATED not in fields:
fields.append(config.LAST_UPDATED)
if config.DATE_CREATED not in fields:
fields.append(config.DATE_CREATED)
if config.ETAG not in fields \
and getattr(config, 'IF_MATCH', True):
fields.append(config.ETAG)
result = {}
for field in map(lambda f: f.split('.', 1)[0], fields):
try:
val = obj.__getattribute__(field)
# If association proxies are embedded, their values must be copied
# since they are garbage collected when Eve try to encode the
# response.
if hasattr(val, 'copy'):
val = val.copy()
result[field] = _sanitize_value(val)
except AttributeError:
latest_doc = None
cursor = None
# calculate last_modified before get_old_document rolls back the document,
# allowing us to invalidate the cache when _latest_version changes
last_modified = last_updated(document)
# synthesize old document version(s)
if resource_def["versioning"] is True:
latest_doc = document
document = get_old_document(resource, req, lookup, document, version)
# meld into response document
build_response_document(document, resource, embedded_fields, latest_doc)
if config.IF_MATCH:
etag = document[config.ETAG]
if resource_def["versioning"] is True:
# In order to keep the LATEST_VERSION field up to date in client
# caches, changes to the latest version should invalidate cached
# copies of previous verisons. Incorporate the latest version into
# versioned document ETags on the fly to ensure 'If-None-Match'
# comparisons support this caching behavior.
etag += str(document[config.LATEST_VERSION])
# check embedded fields resolved in build_response_document() for more
# recent last updated timestamps. We don't want to respond 304 if embedded
# fields have changed
for field in embedded_fields:
embedded_document = document.get(field)
if isinstance(embedded_document, dict):
embedded_last_updated = last_updated(embedded_document)
if embedded_last_updated > last_modified:
.. versionchanged:: 0.7
Only raise OriginalChangedError() if IF_MATCH is enabled. Closes
#920.
.. versionchanged:: 0.6.1
Support for PyMongo 3.0.
.. versionchanged:: 0.6
Return 400 if an attempt is made to update/replace an immutable
field.
"""
id_field = config.DOMAIN[resource]['id_field']
query = {id_field: id_}
if config.ETAG in original:
query[config.ETAG] = original[config.ETAG]
datasource, filter_, _, _ = self._datasource_ex(
resource, query)
coll = self.get_collection_with_write_concern(datasource, resource)
try:
result = coll.replace_one(filter_, changes) if replace else \
coll.update_one(filter_, changes)
if (result and result.acknowledged and config.IF_MATCH and
result.modified_count == 0):
raise self.OriginalChangedError()
except pymongo.errors.DuplicateKeyError as e:
abort(400, description=debug_error_message(
'pymongo.errors.DuplicateKeyError: %s' % e
))
# ``ENFORCE_IF_MATCH`` fields.
abort(
428,
description="To edit a document "
"its etag must be provided using the If-Match header",
)
# ensure the retrieved document has LAST_UPDATED and DATE_CREATED,
# eventually with same default values as in GET.
document[config.LAST_UPDATED] = last_updated(document)
document[config.DATE_CREATED] = date_created(document)
if req.if_match and concurrency_check:
ignore_fields = config.DOMAIN[resource]["etag_ignore_fields"]
etag = document.get(
config.ETAG, document_etag(document, ignore_fields=ignore_fields)
)
if req.if_match != etag:
# client and server etags must match, or we don't allow editing
# (ensures that client's version of the document is up to date)
abort(412, description="Client and server etags don't match")
return document
abort(412, description="Client and server etags don't match")
# update oplog if needed
oplog_push(resource, document, "PUT")
insert_versioning_documents(resource, document)
# notify callbacks
getattr(app, "on_replaced")(resource, document, original)
getattr(app, "on_replaced_%s" % resource)(document, original)
# build the full response document
build_response_document(document, resource, embedded_fields, document)
response = document
if config.IF_MATCH:
etag = response[config.ETAG]
else:
issues = validator.errors
except DocumentError as e:
# TODO should probably log the error and abort 400 instead (when we
# got logging)
issues["validator exception"] = str(e)
except exceptions.HTTPException as e:
raise e
except Exception as e:
# consider all other exceptions as Bad Requests
app.logger.exception(e)
abort(400, description=debug_error_message("An exception occurred: %s" % e))
if issues:
response[config.ISSUES] = issues
response[config.STATUS] = config.STATUS_ERR
'item_url': 'regex("[0-9]+")'
}
}
projection = domain[resource]['datasource']['projection'] = {}
if hasattr(cls_, '_eve_resource'):
dict_update(domain[resource], cls_._eve_resource)
all_orm_descriptors = inspect(cls_).all_orm_descriptors
for desc in all_orm_descriptors:
if isinstance(desc, InstrumentedAttribute):
prop = desc.property
if prop.key in (config.LAST_UPDATED,
config.DATE_CREATED,
config.ETAG):
continue
if hasattr(prop, 'columns') and \
hasattr(prop.columns[0], 'foreign_keys') and \
len(prop.columns[0].foreign_keys) > 0:
continue
schema = domain[resource]['schema'][prop.key] = {}
self.register_column(prop, schema, projection)
elif desc.extension_type is HYBRID_PROPERTY:
schema = domain[resource]['schema'][desc.__name__] = {}
schema['unique'] = False
schema['required'] = False
schema['readonly'] = True
schema['type'] = 'string'
projection[desc.__name__] = 1