Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrapper(self):
payload = '{"i": {"a": "hello"}, "d": {"x": null}, "l": ["m", "n"], '+\
'"o": [{"a":"hi"},{"b":9}], "p": [{"ll": ["q", "w"]}]}'
response = self.client.post('/complexdoc/',
data=payload,
content_type='application/json')
json_data = response.get_json()
self._id = json_data[config.ID_FIELD]
self.url = '/complexdoc/%s' % json_data[config.ID_FIELD]
self.etag = json_data[config.ETAG]
# check if etags are okay
self.assertEqual(self.client.get(self.url).get_json()[config.ETAG], self.etag)
#self._id = response[config.ID_FIELD]
self.updated = json_data[config.LAST_UPDATED]
try:
f(self)
finally:
ComplexDoc.objects().delete()
return wrapper
def test_post_simple(self):
now = datetime.now()
response = self.client.post('/simpledoc/',
data='{"a": "jimmy", "b": 23}',
content_type='application/json')
self.assertEqual(response.status_code, 201)
post_data = response.get_json()
self.assertEqual(post_data[config.STATUS], "OK")
_id = post_data['_id']
response = self.client.get('/simpledoc/%s' % _id)
get_data = response.get_json()
# updated field must match
self.assertEqual(post_data[config.LAST_UPDATED], get_data[config.LAST_UPDATED])
def test_find_one(self):
d = SimpleDoc(a='Tom', b=223).save()
response = self.client.get('/simpledoc/%s' % d.id)
# has to return one record
json_data = response.get_json()
self.assertIn(config.LAST_UPDATED, json_data)
self.assertIn(config.DATE_CREATED, json_data)
self.assertEqual(json_data['_id'], str(d.id))
self.assertEqual(json_data['a'], 'Tom')
self.assertEqual(json_data['b'], 223)
d.delete()
def sqla_object_to_dict(obj, fields):
""" Creates a dict containing copies of the requested fields from the
SQLAlchemy query result """
if config.LAST_UPDATED not in fields:
fields.append(config.LAST_UPDATED)
if config.DATE_CREATED not in fields:
fields.append(config.DATE_CREATED)
if config.ETAG not in fields \
and getattr(config, 'IF_MATCH', True):
fields.append(config.ETAG)
result = {}
for field in map(lambda f: f.split('.', 1)[0], fields):
try:
val = obj.__getattribute__(field)
# If association proxies are embedded, their values must be copied
# since they are garbage collected when Eve try to encode the
# response.
if hasattr(val, 'copy'):
val = val.copy()
.. versionadded:: 0.3
"""
datasource, filter_, _, _ = self.datasource(resource)
coll = self.pymongo(resource).db[datasource]
try:
if not filter_:
# faster, but we can only afford it if there's now predefined
# filter on the datasource.
return coll.count() == 0
else:
# fallback on find() since we have a filter to apply.
try:
# need to check if the whole resultset is missing, no
# matter the IMS header.
del filter_[config.LAST_UPDATED]
except:
pass
return coll.find(filter_).count() == 0
except pymongo.errors.OperationFailure as e:
# see comment in :func:`insert()`.
self.app.logger.exception(e)
abort(500, description=debug_error_message(
'pymongo.errors.OperationFailure: %s' % e
))
'item_lookup': True,
'item_lookup_field': config.ID_FIELD,
'item_url': 'regex("[0-9]+")'
}
}
projection = domain[resource]['datasource']['projection'] = {}
if hasattr(cls_, '_eve_resource'):
dict_update(domain[resource], cls_._eve_resource)
all_orm_descriptors = inspect(cls_).all_orm_descriptors
for desc in all_orm_descriptors:
if isinstance(desc, InstrumentedAttribute):
prop = desc.property
if prop.key in (config.LAST_UPDATED,
config.DATE_CREATED,
config.ETAG):
continue
if hasattr(prop, 'columns') and \
hasattr(prop.columns[0], 'foreign_keys') and \
len(prop.columns[0].foreign_keys) > 0:
continue
schema = domain[resource]['schema'][prop.key] = {}
self.register_column(prop, schema, projection)
elif desc.extension_type is HYBRID_PROPERTY:
schema = domain[resource]['schema'][desc.__name__] = {}
schema['unique'] = False
schema['required'] = False
schema['readonly'] = True
schema['type'] = 'string'
def _get_mapping_properties(self, resource_config, parent=None):
properties = self._get_mapping(resource_config['schema'])
properties['properties'].update({
config.DATE_CREATED: self._get_field_mapping({'type': 'datetime'}),
config.LAST_UPDATED: self._get_field_mapping({'type': 'datetime'}),
})
if parent:
properties.update({
'_parent': {
'type': parent.get('type')
}
})
properties['properties'].pop('_id', None)
return properties
.. versionadded:: 0.3
"""
datasource, filter_, _, _ = self.datasource(resource)
coll = self.pymongo(resource).db[datasource]
try:
if not filter_:
# faster, but we can only afford it if there's now predefined
# filter on the datasource.
return coll.count_documents({}) == 0
else:
# fallback on find() since we have a filter to apply.
try:
# need to check if the whole resultset is missing, no
# matter the IMS header.
del filter_[config.LAST_UPDATED]
except:
pass
return coll.count_documents(filter_) == 0
except pymongo.errors.OperationFailure as e:
# see comment in :func:`insert()`.
self.app.logger.exception(e)
abort(
500,
description=debug_error_message(
"pymongo.errors.OperationFailure: %s" % e
),
doc_issues = {}
try:
document = parse(value, resource)
resolve_sub_resource_path(document, resource)
if skip_validation:
validation = True
else:
validation = validator.validate(document)
if validation: # validation is successful
# validator might be not available if skip_validation. #726.
if validator:
# Apply coerced values
document = validator.document
# Populate meta and default fields
document[config.LAST_UPDATED] = document[config.DATE_CREATED] = date_utc
if config.DOMAIN[resource]["soft_delete"] is True:
document[config.DELETED] = False
resolve_user_restricted_access(document, resource)
store_media_files(document, resource)
resolve_document_version(document, resource, "POST")
else:
# validation errors added to list of document issues
doc_issues = validator.errors
except DocumentError as e:
doc_issues["validation exception"] = str(e)
except Exception as e:
# most likely a problem with the incoming payload, report back to
# the client as if it was a validation issue
app.logger.exception(e)
if not original or (soft_delete_enabled and original.get(config.DELETED) is True):
return all_done()
# notify callbacks
if not suppress_callbacks:
getattr(app, "on_delete_item")(resource, original)
getattr(app, "on_delete_item_%s" % resource)(original)
if soft_delete_enabled:
# Instead of removing the document from the db, just mark it as deleted
marked_document = copy.deepcopy(original)
# Set DELETED flag and update metadata
last_modified = datetime.utcnow().replace(microsecond=0)
marked_document[config.DELETED] = True
marked_document[config.LAST_UPDATED] = last_modified
if config.IF_MATCH:
resolve_document_etag(marked_document, resource)
resolve_document_version(marked_document, resource, "DELETE", original)
# Update document in database (including version collection if needed)
id = original[resource_def["id_field"]]
try:
app.data.replace(resource, id, marked_document, original)
except app.data.OriginalChangedError:
if concurrency_check:
abort(412, description="Client and server etags don't match")
# create previous version if it wasn't already there
late_versioning_catch(original, resource)