Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
All elements of the stream are assumed to be lists or tuples
Examples
--------
>>> source = Stream()
>>> batches = source.to_batch()
>>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list()
>>> source.emit([{'name': 'Alice', 'value': 1},
... {'name': 'Bob', 'value': 2},
... {'name': 'Charlie', 'value': 3}])
>>> source.emit([{'name': 'Alice', 'value': 4},
... {'name': 'Bob', 'value': 5},
... {'name': 'Charlie', 'value': 6}])
"""
from .batch import Batch
return Batch(stream=self, **kwargs)
def _pluck(seq, ind):
return list(toolz.pluck(ind, seq))
def _map_map(seq, func, **kwargs):
return list(map(func, seq, **kwargs))
def _accumulate_sum(accumulator, new):
return accumulator + sum(new)
map_type = type(map(lambda x: x, []))
_stream_types['streaming'].append(((list, tuple, set), Batch))
def __init__(self, stream=None, example=None):
if example is None:
example = []
super(Batch, self).__init__(stream=stream, example=example)