Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def append(self, other):
if isinstance(other, Collection):
right = MDataFrame(other)
assert isinstance(
other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other))
outname = self.collection.name
mrout = {
'merge': outname,
'nonAtomic': True,
}
mapfn = Code("""
function() {
this._id = ObjectId();
if(this['_om#rowid']) {
this['_om#rowid'] += %s;
}
emit(this._id, this);
}
""" % len(self))
in. If not provided a temporary name will be created.
:param suffixes: the suffixes to apply to identical left and right
columns
:param sort: if True the merge results will be sorted. If False the
MongoDB natural order is implied.
:returns: the MDataFrame to the target MDataFrame
"""
# validate input
supported_how = ["left", 'inner', 'right']
assert how in supported_how, "only %s merges are currently supported" % supported_how
for key in [on, left_on, right_on]:
if key:
assert isinstance(
key, six.string_types), "only single column merge keys are supported (%s)" % key
if isinstance(right, Collection):
right = MDataFrame(right)
assert isinstance(
right, MDataFrame), "both must be MDataFrames, got right=%" % type(right)
if how == 'right':
# A right B == B left A
return right.merge(self, on=on, left_on=right_on, right_on=left_on,
how='left', target=target, suffixes=suffixes)
# generate lookup parameters
on = on or '_id'
right_name = self._get_collection_name_of(right, right)
target_name = self._get_collection_name_of(
target, '_temp.merge.%s' % uuid4().hex)
target_field = (
"%s_%s" % (right_name.replace('.', '_'), right_on or on))
lookup = qops.LOOKUP(right_name,
key=on,
left_key=left_on,
:param columns: the column projection as a list of column names
:param lazy: if True returns a lazy representation as an MDataFrame.
If False retrieves all data and returns a DataFrame (default)
:param filter: the filter to be applied as a column__op=value dict
:param version: the version to retrieve (not supported)
:param is_series: if True retruns a Series instead of a DataFrame
:param kwargs: remaining kwargs are used a filter. The filter kwarg
overrides other kwargs.
:return: the retrieved object (DataFrame, Series or MDataFrame)
"""
collection = self.collection(name)
if lazy:
from ..mdataframe import MDataFrame
filter = filter or kwargs
df = MDataFrame(collection, columns=columns).query(**filter)
if is_series:
df = df[0]
else:
# TODO ensure the same processing is applied in MDataFrame
# TODO this method should always use a MDataFrame disregarding lazy
filter = filter or kwargs
if filter:
from .query import Filter
query = Filter(collection, **filter).query
cursor = collection.find(filter=query, projection=columns)
else:
cursor = collection.find(projection=columns)
# restore dataframe
df = cursor_to_dataframe(cursor)
if '_id' in df.columns:
del df['_id']
def __getattr__(self, attr):
if attr in MDataFrame.STATFUNCS:
return self.statfunc(attr)
if attr in self.columns:
kwargs = self._getcopy_kwargs()
kwargs.update(columns=attr)
return MSeries(self.collection, **kwargs)
raise AttributeError(attr)
indexer = MLocIndexer(self)
return indexer
@property
def iloc(self):
self._evaluated = None
indexer = MPosIndexer(self)
return indexer
def __repr__(self):
kwargs = ', '.join('{}={}'.format(k, v) for k, v in six.iteritems(self._getcopy_kwargs()))
return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection,
kwargs=kwargs)
class MSeries(MDataFrame):
"""
Series implementation for MDataFrames
behaves like a DataFrame but limited to one column.
"""
def __init__(self, *args, **kwargs):
super(MSeries, self).__init__(*args, **kwargs)
# true if only unique values apply
self.is_unique = False
# apply mixins
self._applyto = str(self.__class__)
self._apply_mixins(*args, **kwargs)
def __getitem__(self, cols_or_slice):
if isinstance(cols_or_slice, Filter):
expected_columns = list(project.keys())
project = {"$project": project}
# store merged documents and return an MDataFrame to it
out = qops.OUT(target_name)
pipeline = [lookup, unwind, project]
if sort:
sort_cols = make_list(on or [left_on, right_on])
sort_key = qops.make_sortkey(sort_cols)
sort = qops.SORT(**dict(sort_key))
pipeline.append(sort)
pipeline.append(out)
if inspect:
result = pipeline
else:
result = self.collection.aggregate(pipeline)
result = MDataFrame(self.collection.database[target_name],
force_columns=expected_columns)
return result
def apply(self, fn, inplace=False, preparefn=None):
if inplace:
obj = self
else:
kwargs = self._getcopy_kwargs()
kwargs.update(preparefn=preparefn)
if isinstance(self, MSeries):
obj = MSeries(self.collection, **kwargs)
else:
obj = MDataFrame(self.collection, **kwargs)
obj.apply_fn = fn
return obj
:param cols_or_slice: single column (str), multi-columns (list),
slice to select columns or a masked-style
:return: filtered MDataFrame or MSeries
"""
if isinstance(cols_or_slice, six.string_types):
# column name => MSeries
return self._as_mseries(cols_or_slice)
elif isinstance(cols_or_slice, int):
# column number => MSeries
column = self.columns[cols_or_slice]
return self._as_mseries(column)
elif isinstance(cols_or_slice, (tuple, list)):
# list of column names => MDataFrame subset on columns
kwargs = self._getcopy_kwargs()
kwargs.update(columns=cols_or_slice)
return MDataFrame(self.collection, **kwargs)
elif isinstance(cols_or_slice, Filter):
kwargs = self._getcopy_kwargs()
kwargs.update(query=cols_or_slice.query)
return MDataFrame(self.collection, **kwargs)
elif isinstance(cols_or_slice, np.ndarray):
raise NotImplemented
raise ValueError('unknown accessor type %s' % type(cols_or_slice))