Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_cursor(self):
projection = make_tuple(self.columns)
projection += make_tuple(self._get_frame_index())
if not self.sort_order:
# implicit sort
projection += make_tuple(self._get_frame_om_fields())
cursor = self.collection.find(projection=projection)
if self.sort_order:
cursor.sort(qops.make_sortkey(make_tuple(self.sort_order)))
if self.head_limit:
cursor.limit(self.head_limit)
if self.skip_topn:
cursor.skip(self.skip_topn)
return cursor
def _get_cursor(self):
projection = make_tuple(self.columns)
projection += make_tuple(self._get_frame_index())
if not self.sort_order:
# implicit sort
projection += make_tuple(self._get_frame_om_fields())
cursor = self.collection.find(projection=projection)
if self.sort_order:
cursor.sort(qops.make_sortkey(make_tuple(self.sort_order)))
if self.head_limit:
cursor.limit(self.head_limit)
if self.skip_topn:
cursor.skip(self.skip_topn)
return cursor
def inner(self, other, *args):
# get all values passed and build terms from them
values = list(make_tuple(other) + args)
terms = []
for term in values:
if isinstance(term, six.string_types):
# if the term is a column name, add as a column name
if term in self.columns:
term = '$' + term
# allow to specify values explicitely by $$ =>
term = term.replace('$$', '')
terms.append(term)
# limit number of terms if requested
if max_terms:
terms = terms[:max_terms]
# add projection of output columns to operator
mapping = {
col: {
op: terms if base is None else ['$' + col] + terms,
See the following link for a list of supported operations.
https://docs.mongodb.com/manual/reference/operator/aggregation/group/
:param specs: a dictionary of { column : function | list[functions] }
pairs.
"""
def add_stats(specs, column, stat):
specs['%s_%s' % (column, stat)] = {
'$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column}
# generate $group command
_specs = {}
for column, stats in six.iteritems(specs):
stats = make_tuple(stats)
for stat in stats:
add_stats(_specs, column, stat)
groupby = qops.GROUP(columns=self.columns,
**_specs)
# execute and return a dataframe
pipeline = self._amend_pipeline([groupby])
data = self.collection.aggregate(pipeline, allowDiskUse=True)
def get_data():
# we need this to build a pipeline for from_records
# to process, otherwise the cursor will be exhausted already
for group in data:
_id = group.pop('_id')
if isinstance(_id, dict):
group.update(_id)
yield group
if (isinstance(specs, enumerable_types)
and isscalar(specs[0]) and len(idx_cols) == 1
and not any(isinstance(s, slice) for s in specs)):
# single column index with list of scalar values
if (self.positional and isinstance(specs, tuple) and len(specs) == 2
and all(isscalar(v) for v in specs)):
# iloc[int, int] is a cell access
flt_kwargs[idx_cols[0]] = specs[0]
projection.extend(self._get_projection(specs[1]))
else:
flt_kwargs['{}__in'.format(idx_cols[0])] = specs
self._from_range = True
elif isinstance(specs, (int, str)):
flt_kwargs[idx_cols[0]] = specs
else:
specs = make_tuple(specs)
# list/tuple of slices or scalar values, or MultiIndex
for i, spec in enumerate(specs):
if i < len(idx_cols):
col = idx_cols[i]
if isinstance(spec, slice):
self._from_range = True
start, stop = spec.start, spec.stop
if start is not None:
flt_kwargs['{}__gte'.format(col)] = start
if stop is not None:
if isinstance(stop, int):
stop -= int(self.positional)
flt_kwargs['{}__lte'.format(col)] = stop
elif isinstance(spec, enumerable_types) and isscalar(spec[0]):
self._from_range = True
# single column index with list of scalar values
def inner(self, columns=None):
columns = make_tuple(columns or self.columns)
mapping = {
col: {
op: '$' + col,
}
for col in columns}
self.project(mapping)
return self
def row_to_doc(obj):
for gval, gdf in obj.groupby(groupby):
if hasattr(gval,'astype'):
gval = make_tuple(gval.astype('O'))
else:
gval = make_tuple(gval)
doc = dict(zip(groupby, gval))
datacols = list(set(gdf.columns) - set(groupby))
doc['_data'] = gdf[datacols].astype('O').to_dict('records')
yield doc
def inner(self, columns=None):
columns = make_tuple(columns or self.columns)
stage = self._getGroupBy(by='$$last')
groupby = stage['$group']
groupby.update({
'{}_{}'.format(col, opname): {
op: '$' + col
} for col in columns
})
self.computed.extend(groupby.keys())
self.project_keeper_columns()
return self
def _getcopy_kwargs(self, without=None):
""" return all parameters required on a copy of this MDataFrame """
kwargs = dict(columns=self.columns,
sort_order=self.sort_order,
limit=self.head_limit,
skip=self.skip_topn,
from_loc_indexer=self.from_loc_indexer,
immediate_loc=self.immediate_loc,
query=self.filter_criteria,
auto_inspect=self.auto_inspect,
preparefn=self._preparefn)
[kwargs.pop(k) for k in make_tuple(without or [])]
return kwargs
def _as_mseries(self, column):
kwargs = self._getcopy_kwargs()
kwargs.update(columns=make_tuple(column))
return MSeries(self.collection, **kwargs)