Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_getitem(self):
df = create_df_from_es()
self.assertTrue(isinstance(df['a'], Column))
self.assertTrue(isinstance(df['b'], Column))
expr = df['a'] > 2
self.assertTrue(isinstance(expr, BooleanFilter))
self.assertTrue(isinstance(df[expr], DataFrame))
self.assertEqual(df[expr]._filter.build(), {'range': {'a': {'gt': 2}}})
def count(self):
"""
Returns a list of numbers indicating the count for each group
>>> df.groupby(df.gender).count()
[2, 1]
"""
df = DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=self._filter,
groupby=self._groupby,
aggregation=_count_aggregator,
projection=self._projection,
sort=self._sort,
limit=self._limit,
compat=self._compat)
return df
>>> df.filter(df['age'] < 13).collect()
[Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]
"""
if isinstance(condition, six.string_types):
_filter = ScriptFilter(condition)
elif isinstance(condition, BooleanFilter):
_filter = condition
else:
raise TypeError('{0} is supposed to be str or BooleanFilter'.format(condition))
# chaining filter treated as AND
if self._filter is not None:
_filter = (self._filter & _filter)
return DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=_filter,
groupby=self._groupby,
aggregation=self._aggregation,
projection=self._projection,
sort=self._sort,
limit=self._limit,
compat=self._compat)
orderby() is an alias for sort().
>>> df.sort(df['age'].asc).collect()
[Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]
"""
sorts = []
for col in cols:
if isinstance(col, six.string_types):
sorts.append(ScriptSorter(col).build())
elif isinstance(col, Sorter):
sorts.append(col.build())
else:
raise TypeError('{0} is supposed to be str or Sorter'.format(col))
return DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=self._filter,
groupby=self._groupby,
aggregation=self._aggregation,
projection=self._projection,
sort=sorts,
limit=self._limit,
compat=self._compat)
def _get_mappings(self, json_map, index_name):
if self._compat >= 7:
return DataFrame.resolve_mappings(json_map[index_name]["mappings"]["properties"])
else:
if self._doc_type is not None:
return DataFrame.resolve_mappings(json_map[index_name]["mappings"][self._doc_type]["properties"])
else:
raise DataFrameException('Please specify doc_type for ES version under 7')
"""
columns = []
if len(cols) == 1 and isinstance(cols[0], Grouper):
groupby = cols[0].build()
else:
for col in cols:
if isinstance(col, six.string_types):
columns.append(getattr(self, col))
elif isinstance(col, Column):
columns.append(col)
else:
raise TypeError('{0} is supposed to be str or Column'.format(col))
names = [col.field_name() for col in columns]
groupby = Grouper.from_list(names).build()
return DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=self._filter,
groupby=groupby,
aggregation=self._aggregation,
projection=self._projection,
sort=self._sort,
limit=self.limit,
compat=self._compat)
Projects a set of columns and returns a new :class:`DataFrame `
:param cols: list of column names or :class:`Column `.
>>> df.filter(df['age'] < 25).select('name', 'age').collect()
[Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]
"""
projection = []
for col in cols:
if isinstance(col, six.string_types):
projection.append(getattr(self, col))
elif isinstance(col, Column):
projection.append(col)
else:
raise TypeError('{0} is supposed to be str or Column'.format(col))
return DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=self._filter,
groupby=self._groupby,
aggregation=self._aggregation,
projection=projection,
sort=self._sort,
limit=self._limit,
compat=self._compat)
def _get_mappings(self, json_map, index_name):
if self._compat >= 7:
return DataFrame.resolve_mappings(json_map[index_name]["mappings"]["properties"])
else:
if self._doc_type is not None:
return DataFrame.resolve_mappings(json_map[index_name]["mappings"][self._doc_type]["properties"])
else:
raise DataFrameException('Please specify doc_type for ES version under 7')
def agg(self, *aggs):
"""
Aggregate on the entire DataFrame without groups.
:param aggs: a list of :class:`Aggregator ` objects
>>> df[df['gender'] == 'male'].agg(df['age'].avg).collect()
[Row(avg(age)=12)]
"""
aggregation = {}
for agg in aggs:
assert isinstance(agg, Aggregator)
aggregation.update(agg.build())
return DataFrame(client=self._client,
index=self._index,
doc_type=self._doc_type,
mapping=self._mapping,
filter=self._filter,
groupby=self._groupby,
aggregation=aggregation,
projection=self._projection,
sort=self._sort,
limit=self._limit,
compat=self._compat)