Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_search_via_source_param_and_with_source_projection(self):
query = {'query': {'query_string': {'query': 'foo'}}}
with self.app.app_context():
self.app.data.insert('items_with_description', [{'uri': 'foo',
'description': 'This is foo',
'name': 'foo'}])
self.app.data.insert('items_with_description', [{'uri': 'bar', 'name': 'bar'}])
req = ParsedRequest()
req.args = {'source': json.dumps(query), 'projections': json.dumps(["name"])}
res = self.app.data.find('items_with_description', req, None)
self.assertEqual(1, res.count())
self.assertTrue('description' not in res.docs[0])
self.assertTrue('name' in res.docs[0])
def test_sub_resource_lookup_with_schema_filter(self):
with self.app.app_context():
self.app.data.insert('items_with_description', [{'uri': 'foo', 'description': 'test', 'name': 'foo'}])
req = ParsedRequest()
req.args = {}
self.assertEqual(1, self.app.data.find('items_with_description', req, {'name': 'foo'}).count())
self.assertEqual(0, self.app.data.find('items_with_description', req, {'name': 'bar'}).count())
def test_should_highlight(self):
with self.app.app_context():
req = ParsedRequest()
req.args = {'es_highlight': 1}
self.assertTrue(self.app.data.should_highlight(req))
req.args = {'es_highlight': '0'}
self.assertFalse(self.app.data.should_highlight(req))
def test_get_projected_fields(self):
with self.app.app_context():
req = ParsedRequest()
req.args = {'projections': json.dumps(["priority", "urgency", "word_count", "slugline", "highlights"])}
fields = self.app.data.get_projected_fields(req)
self.assertEqual(fields, "priority,urgency,word_count,slugline,highlights")
def test_search_via_source_param(self):
query = {'query': {'term': {'uri': 'foo'}}}
with self.app.app_context():
self.app.data.insert('items', [{'uri': 'foo', 'name': 'foo'}])
self.app.data.insert('items', [{'uri': 'bar', 'name': 'bar'}])
req = ParsedRequest()
req.args = {'source': json.dumps(query)}
res = self.app.data.find('items', req, None)
self.assertEqual(1, res.count())
Support for custom query parameters via configuration settings.
Minor DRY updates.
.. versionchanged:: 0.1.0
Support for embedded documents.
.. versionchanged:: 0.0.6
projection queries ('?projection={"name": 1}')
.. versionchanged: 0.0.5
Support for optional filters, sorting and pagination.
"""
args = request.args
headers = request.headers
r = ParsedRequest()
r.args = args
settings = config.DOMAIN[resource]
if settings["allowed_filters"]:
r.where = args.get(config.QUERY_WHERE)
if settings["projection"]:
r.projection = args.get(config.QUERY_PROJECTION)
if settings["sorting"]:
r.sort = args.get(config.QUERY_SORT)
if settings["embedding"]:
r.embedded = args.get(config.QUERY_EMBEDDED)
if settings["datasource"]["aggregation"]:
r.aggregation = args.get(config.QUERY_AGGREGATION)
r.show_deleted = config.SHOW_DELETED_PARAM in args
def _get_combined_view_data(self, items, request):
"""Get list of event and planning for the combined view
:param items:
:param request: object representing the HTTP request
"""
ids = set()
for item in items:
# don't want related planing items
_id = item.get('event_item') or item.get('_id')
ids.add(_id)
filters = self._get_date_filters(request)
page = request.page or 1
page_size = self._get_page_size(request)
req = ParsedRequest()
req.args = MultiDict()
req.args['source'] = json.dumps({
'query': {
'bool': {
'must': [{'terms': {'_id': list(ids)}}],
}
},
'filter': filters,
'sort': self._get_sort(),
'size': self._get_page_size(request),
'from': (page - 1) * page_size
})
req.page = request.page or 1
req.max_results = self._get_page_size(request)
return get_resource_service('planning_search').get(req=req, lookup=None)
""""""
query = {
'query': {
'bool': {
'must': [
{'term': {'pubstatus': 'usable'}},
{'terms': {'state': ['scheduled', 'postponed', 'rescheduled']}}
]
}
},
'sort': [
{'versioncreated': {'order': 'asc'}}
],
'size': 0
}
req = ParsedRequest()
req.args = {'source': json.dumps(query)}
cursor = fetch_callback(req=req, lookup=None)
total_documents = cursor.count()
if total_documents > 0:
query['size'] = self.page_size
total_pages = len(range(0, total_documents, self.page_size))
for page_num in range(0, total_pages):
query['from'] = page_num * self.page_size
req = ParsedRequest()
req.args = {'source': json.dumps(query)}
cursor = fetch_callback(req=req, lookup=None)
yield list(cursor)
def get_series(self, query, sort, max_results):
page = 1
while True:
# Get the results from mongo
req = ParsedRequest()
req.sort = sort
req.where = json.dumps(query)
req.max_results = max_results
req.page = page
results = self.get_from_mongo(req=req, lookup=None)
docs = list(results)
if not docs:
break
page += 1
# Yield the results for iteration by the callee
for doc in docs:
yield doc
query = {
'query': {
'filtered': {
'filter': {
'bool': {
'must': {
'term': {'assignment_id': str(assignment[config.ID_FIELD])}
},
}
}
}
}
}
req = ParsedRequest()
repos = 'archive,published,archived'
req.args = {'source': json.dumps(query), 'repo': repos}
return get_resource_service('search').get(req=req, lookup=None)