Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __setitem__(self, column, value):
# True for any scalar type, numeric, bool, string
if np.isscalar(value):
result = self.collection.update_many(filter=self.filter_criteria,
update=qops.SET(column, value))
self.columns.append(column)
return self
return right.merge(self, on=on, left_on=right_on, right_on=left_on,
how='left', target=target, suffixes=suffixes)
# generate lookup parameters
on = on or '_id'
right_name = self._get_collection_name_of(right, right)
target_name = self._get_collection_name_of(
target, '_temp.merge.%s' % uuid4().hex)
target_field = (
"%s_%s" % (right_name.replace('.', '_'), right_on or on))
lookup = qops.LOOKUP(right_name,
key=on,
left_key=left_on,
right_key=right_on,
target=target_field)
# unwind merged documents from arrays to top-level document fields
unwind = qops.UNWIND(target_field, preserve=how != 'inner')
# get all fields from left, right
project = {}
for left_col in self.columns:
source_left_col = left_col
if left_col == '_id':
project[left_col] = 1
continue
if left_col.startswith('_idx'):
continue
if left_col.startswith('_om#'):
continue
if left_col != (on or left_on) and left_col in right.columns:
left_col = '%s%s' % (left_col, suffixes[0])
project[left_col] = "$%s" % source_left_col
for right_col in right.columns:
if right_col == '_id':
def aggregate(self, pipeline, filter=None, **kwargs):
query = dict(self.query)
query.update(filter or {})
pipeline.insert(0, qops.MATCH(query))
return super(FilteredCollection, self).aggregate(pipeline, **kwargs)
def find(self, filter=None, **kwargs):
:param specs: a dictionary of { column : function | list[functions] }
pairs.
"""
def add_stats(specs, column, stat):
specs['%s_%s' % (column, stat)] = {
'$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column}
# generate $group command
_specs = {}
for column, stats in six.iteritems(specs):
stats = make_tuple(stats)
for stat in stats:
add_stats(_specs, column, stat)
groupby = qops.GROUP(columns=self.columns,
**_specs)
# execute and return a dataframe
pipeline = self._amend_pipeline([groupby])
data = self.collection.aggregate(pipeline, allowDiskUse=True)
def get_data():
# we need this to build a pipeline for from_records
# to process, otherwise the cursor will be exhausted already
for group in data:
_id = group.pop('_id')
if isinstance(_id, dict):
group.update(_id)
yield group
df = pd.DataFrame.from_records(get_data())
columns = make_list(self.columns)