Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _amend_pipeline(self, pipeline):
""" amend pipeline with default ops on coll.aggregate() calls """
if self.sort_order:
sort = qops.SORT(**dict(qops.make_sortkey(self.sort_order)))
pipeline.append(sort)
return pipeline
def _amend_pipeline(self, pipeline):
""" amend pipeline with default ops on coll.aggregate() calls """
if self.should_sort:
sort = qops.SORT(**dict(qops.make_sortkey('_id')))
pipeline.append(sort)
return pipeline
def _count(self):
count_columns = self._non_group_columns()
if len(count_columns) == 0:
count_columns.append('_'.join(self.columns) + '_count')
groupby = {
"$group": {
"_id": {k: "$%s" % k for k in self.columns},
}
}
for k in count_columns:
groupby['$group']['%s' % k] = {"$sum": 1}
pipeline = self._amend_pipeline([groupby])
if self.should_sort:
sort = qops.SORT(**dict(qops.make_sortkey('_id')))
pipeline.append(sort)
return list(self.collection.aggregate(pipeline))
# have it from left
continue
if right_col in self.columns:
left_col = '%s%s' % (right_col, suffixes[1])
else:
left_col = '%s' % right_col
project[left_col] = '$%s.%s' % (target_field, right_col)
expected_columns = list(project.keys())
project = {"$project": project}
# store merged documents and return an MDataFrame to it
out = qops.OUT(target_name)
pipeline = [lookup, unwind, project]
if sort:
sort_cols = make_list(on or [left_on, right_on])
sort_key = qops.make_sortkey(sort_cols)
sort = qops.SORT(**dict(sort_key))
pipeline.append(sort)
pipeline.append(out)
if inspect:
result = pipeline
else:
result = self.collection.aggregate(pipeline)
result = MDataFrame(self.collection.database[target_name],
force_columns=expected_columns)
return result