Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
other streams.
See also
--------
Stream.zip
Stream.combine_latest
"""
def __init__(self, *upstreams, **kwargs):
super(union, self).__init__(upstreams=upstreams, **kwargs)
def update(self, x, who=None):
return self._emit(x)
@Stream.register_api()
class pluck(Stream):
""" Select elements from elements in the stream.
Parameters
----------
pluck : object, list
The element(s) to pick from the incoming element in the stream
If an instance of list, will pick multiple elements.
Examples
--------
>>> source = Stream()
>>> source.pluck([0, 3]).sink(print)
>>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]:
... source.emit(x)
(1, 4)
(4, 7)
self.kwargs = kwargs
self.args = args
Stream.__init__(self, upstream, stream_name=stream_name)
_global_sinks.add(self)
def update(self, x, who=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []
@Stream.register_api()
class map(Stream):
""" Apply a function to every element in the stream
Parameters
----------
func: callable
*args :
The arguments to pass to the function.
**kwargs:
Keyword arguments to pass to func
Examples
--------
>>> source = Stream()
>>> source.map(lambda x: 2*x).sink(print)
>>> for i in range(5):
... source.emit(i)
self.lossless_buffer.append(x)
self.last[idx] = x
if self.missing and who in self.missing:
self.missing.remove(who)
if not self.missing:
L = []
while self.lossless_buffer:
self.last[0] = self.lossless_buffer.popleft()
L.append(self._emit(tuple(self.last)))
return L
@Stream.register_api()
class latest(Stream):
""" Drop held-up data and emit the latest result
This allows you to skip intermediate elements in the stream if there is
some back pressure causing a slowdown. Use this when you only care about
the latest elements, and are willing to lose older data.
This passes through values without modification otherwise.
Examples
--------
>>> source.map(f).latest().map(g) # doctest: +SKIP
"""
_graphviz_shape = 'octagon'
def __init__(self, upstream, loop=None):
loop = loop or upstream.loop or IOLoop.current()
self.next = 0
Stream.__init__(self, upstream, **kwargs)
@gen.coroutine
def update(self, x, who=None):
now = time()
old_next = self.next
self.next = max(now, self.next) + self.interval
if now < old_next:
yield gen.sleep(old_next - now)
yield self._emit(x)
@Stream.register_api()
class buffer(Stream):
""" Allow results to pile up at this point in the stream
This allows results to buffer in place at various points in the stream.
This can help to smooth flow through the system when backpressure is
applied.
"""
_graphviz_shape = 'diamond'
def __init__(self, upstream, n, loop=None, **kwargs):
loop = loop or upstream.loop or IOLoop.current()
self.queue = Queue(maxsize=n)
Stream.__init__(self, upstream, loop=loop, **kwargs)
self.loop.add_callback(self.cb)
--------
partition
"""
def update(self, x, who=None):
L = []
for item in x:
y = self._emit(item)
if type(y) is list:
L.extend(y)
else:
L.append(y)
return L
@Stream.register_api()
class unique(Stream):
""" Avoid sending through repeated elements
This deduplicates a stream so that only new elements pass through.
You can control how much of a history is stored with the ``history=``
parameter. For example setting ``history=1`` avoids sending through
elements when one is repeated right after the other.
Examples
--------
>>> source = Stream()
>>> source.unique(history=1).sink(print)
>>> for x in [1, 1, 2, 2, 2, 1, 3]:
... source.emit(x)
1
2
1
self.args = args
Stream.__init__(self, upstream, stream_name=stream_name)
def update(self, x, who=None):
result = self.func(x, *self.args, **self.kwargs)
return self._emit(result)
def _truthy(x):
return not not x
@Stream.register_api()
class filter(Stream):
""" Only pass through elements that satisfy the predicate
Parameters
----------
predicate : function
The predicate. Should return True or False, where
True means that the predicate is satisfied.
Examples
--------
>>> source = Stream()
>>> source.filter(lambda x: x % 2 == 0).sink(print)
>>> for i in range(5):
... source.emit(i)
0
2
def update(self, x, who=None):
self.buffer.append(x)
return self.last
@gen.coroutine
def cb(self):
while True:
L, self.buffer = self.buffer, []
self.last = self._emit(L)
yield self.last
yield gen.sleep(self.interval)
@Stream.register_api()
class delay(Stream):
""" Add a time delay to results """
_graphviz_shape = 'octagon'
def __init__(self, upstream, interval, loop=None, **kwargs):
loop = loop or upstream.loop or IOLoop.current()
self.interval = convert_interval(interval)
self.queue = Queue()
Stream.__init__(self, upstream, loop=loop, **kwargs)
self.loop.add_callback(self.cb)
@gen.coroutine
def cb(self):
while True:
last = time()
tp = ck.TopicPartition(self.topics[0], 0, 0)
# blocks for consumer thread to come up
self.consumer.get_watermark_offsets(tp)
self.loop.add_callback(self.poll_kafka)
def _close_consumer(self):
if self.consumer is not None:
consumer = self.consumer
self.consumer = None
consumer.unsubscribe()
consumer.close()
self.stopped = True
class FromKafkaBatched(Stream):
"""Base class for both local and cluster-based batched kafka processing"""
def __init__(self, topic, consumer_params, poll_interval='1s',
npartitions=1, max_batch_size=10000, keys=False,
engine=None, **kwargs):
self.consumer_params = consumer_params
# Override the auto-commit config to enforce custom streamz checkpointing
self.consumer_params['enable.auto.commit'] = 'false'
if 'auto.offset.reset' not in self.consumer_params.keys():
consumer_params['auto.offset.reset'] = 'earliest'
self.topic = topic
self.npartitions = npartitions
self.positions = [0] * npartitions
self.poll_interval = convert_interval(poll_interval)
self.max_batch_size = max_batch_size
self.keys = keys
self.engine = engine
return source
def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False):
file = open(filename, mode=mode)
def write(text):
file.write(prefix + text + suffix)
if flush:
file.flush()
upstream.sink(write)
return file
class Source(Stream):
_graphviz_shape = 'doubleoctagon'
@Stream.register_api(staticmethod)
class from_textfile(Source):
""" Stream data from a text file
Parameters
----------
f: file or string
poll_interval: Number
Interval to poll file for new data in seconds
delimiter: str ("\n")
Character(s) to use to split the data into parts
start: bool (False)
Whether to start running immediately; otherwise call stream.start()