Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from examples.src.python.bolt import WindowSizeBolt
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", WindowSizeBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={SlidingWindowBolt.WINDOW_DURATION_SECS: 10,
SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS: 2})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE}
builder.set_config(topology_config)
builder.build_and_submit()
from heronpy.api.bolt.window_bolt import SlidingWindowBolt
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.component.component_spec import GlobalStreamId
from heronpy.api.stream import Grouping
from heronpy.streamlet.streamlet import Streamlet
from heronpy.streamlet.window import Window
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
class ReduceByWindowBolt(SlidingWindowBolt, StreamletBoltBase):
"""ReduceByWindowBolt"""
FUNCTION = 'function'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
def initialize(self, config, context):
super(ReduceByWindowBolt, self).initialize(config, context)
if ReduceByWindowBolt.FUNCTION not in config:
raise RuntimeError("FUNCTION not specified in reducebywindow operator")
self.reduce_function = config[ReduceByWindowBolt.FUNCTION]
def processWindow(self, window_config, tuples):
result = None
for tup in tuples:
userdata = tup.values[0]
result = self.reduce_function(result, userdata)
self.emit([(Window(window_config.start, window_config.end), result)], stream='output')
# pylint: disable=unused-argument
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", WindowSizeBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={SlidingWindowBolt.WINDOW_DURATION_SECS: 10,
SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS: 2})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE}
builder.set_config(topology_config)
builder.build_and_submit()
"""module for join bolt: ReduceByKeyAndWindowBolt"""
import collections
from heronpy.api.bolt.window_bolt import SlidingWindowBolt
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.component.component_spec import GlobalStreamId
from heronpy.api.stream import Grouping
from heronpy.streamlet.keyedwindow import KeyedWindow
from heronpy.streamlet.streamlet import Streamlet
from heronpy.streamlet.window import Window
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
class ReduceByKeyAndWindowBolt(SlidingWindowBolt, StreamletBoltBase):
"""ReduceByKeyAndWindowBolt"""
FUNCTION = 'function'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
def initialize(self, config, context):
super(ReduceByKeyAndWindowBolt, self).initialize(config, context)
if ReduceByKeyAndWindowBolt.FUNCTION not in config:
raise RuntimeError("FUNCTION not specified in reducebywindow operator")
self.reduce_function = config[ReduceByKeyAndWindowBolt.FUNCTION]
if not callable(self.reduce_function):
raise RuntimeError("Reduce Function has to be callable")
@staticmethod
def _add(key, value, mymap):
if key in mymap:
"""module for bolt: ReduceByWindowBolt"""
import collections
from heronpy.api.bolt.window_bolt import SlidingWindowBolt
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.component.component_spec import GlobalStreamId
from heronpy.api.stream import Grouping
from heronpy.streamlet.streamlet import Streamlet
from heronpy.streamlet.window import Window
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
class ReduceByWindowBolt(SlidingWindowBolt, StreamletBoltBase):
"""ReduceByWindowBolt"""
FUNCTION = 'function'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
def initialize(self, config, context):
super(ReduceByWindowBolt, self).initialize(config, context)
if ReduceByWindowBolt.FUNCTION not in config:
raise RuntimeError("FUNCTION not specified in reducebywindow operator")
self.reduce_function = config[ReduceByWindowBolt.FUNCTION]
def processWindow(self, window_config, tuples):
result = None
for tup in tuples:
userdata = tup.values[0]
result = self.reduce_function(result, userdata)
def initialize(self, config, context):
"""We initialize the window duration and slide interval
"""
if SlidingWindowBolt.WINDOW_DURATION_SECS in config:
self.window_duration = int(config[SlidingWindowBolt.WINDOW_DURATION_SECS])
else:
self.logger.fatal("Window Duration has to be specified in the config")
if SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS in config:
self.slide_interval = int(config[SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS])
else:
self.slide_interval = self.window_duration
if self.slide_interval > self.window_duration:
self.logger.fatal("Slide Interval should be <= Window Duration")
# By modifying the config, we are able to setup the tick timer
config[api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS] = str(self.slide_interval)
self.current_tuples = deque()
if hasattr(self, 'saved_state'):
if 'tuples' in self.saved_state:
self.current_tuples = self.saved_state['tuples']
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
# pylint: disable=too-many-branches
class JoinBolt(SlidingWindowBolt, StreamletBoltBase):
"""JoinBolt"""
OUTER_LEFT = 1
INNER = 2
OUTER_RIGHT = 3
OUTER = 4
JOINFUNCTION = '__join_function__'
JOINTYPE = '__join_type__'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
JOINEDCOMPONENT = '__joined_component__'
def _add(self, key, value, src_component, mymap):
if not key in mymap:
mymap[key] = ([], [])
# Join Output should be Key -> (V1, V2) where
# V1 is coming from the left stream and V2 coming
# from the right stream. In this case, _joined_component
# represents the right stream
if src_component == self._joined_component:
mymap[key][1].append(value)
else:
mymap[key][0].append(value)
def initialize(self, config, context):
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""module for example bolt: WindowSizeBolt"""
from heronpy.api.bolt.window_bolt import SlidingWindowBolt
# pylint: disable=unused-argument
class WindowSizeBolt(SlidingWindowBolt):
"""WindowSizeBolt
A bolt that calculates the average batch size of window"""
def initialize(self, config, context):
super(WindowSizeBolt, self).initialize(config, context)
self.numerator = 0.0
self.denominator = 0.0
def processWindow(self, window_info, tuples):
self.numerator += len(tuples)
self.denominator += 1
self.logger.info("The current average is %f" % (self.numerator / self.denominator))
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.component.component_spec import GlobalStreamId
from heronpy.api.stream import Grouping
from heronpy.streamlet.keyedwindow import KeyedWindow
from heronpy.streamlet.streamlet import Streamlet
from heronpy.streamlet.window import Window
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
class ReduceByKeyAndWindowBolt(SlidingWindowBolt, StreamletBoltBase):
"""ReduceByKeyAndWindowBolt"""
FUNCTION = 'function'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
def initialize(self, config, context):
super(ReduceByKeyAndWindowBolt, self).initialize(config, context)
if ReduceByKeyAndWindowBolt.FUNCTION not in config:
raise RuntimeError("FUNCTION not specified in reducebywindow operator")
self.reduce_function = config[ReduceByKeyAndWindowBolt.FUNCTION]
if not callable(self.reduce_function):
raise RuntimeError("Reduce Function has to be callable")
@staticmethod
def _add(key, value, mymap):
if key in mymap:
mymap[key].append(value)
else:
mymap[key] = [value]
from heronpy.api.bolt.window_bolt import SlidingWindowBolt
from heronpy.api.component.component_spec import GlobalStreamId
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.stream import Grouping
from heronpy.streamlet.keyedwindow import KeyedWindow
from heronpy.streamlet.streamlet import Streamlet
from heronpy.streamlet.window import Window
from heronpy.streamlet.windowconfig import WindowConfig
from heronpy.streamlet.impl.streamletboltbase import StreamletBoltBase
# pylint: disable=unused-argument
# pylint: disable=too-many-branches
class JoinBolt(SlidingWindowBolt, StreamletBoltBase):
"""JoinBolt"""
OUTER_LEFT = 1
INNER = 2
OUTER_RIGHT = 3
OUTER = 4
JOINFUNCTION = '__join_function__'
JOINTYPE = '__join_type__'
WINDOWDURATION = SlidingWindowBolt.WINDOW_DURATION_SECS
SLIDEINTERVAL = SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS
JOINEDCOMPONENT = '__joined_component__'
def _add(self, key, value, src_component, mymap):
if not key in mymap:
mymap[key] = ([], [])
# Join Output should be Key -> (V1, V2) where