Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# under the License.
"""Base bolt for integration tests"""
import copy
from heron.common.src.python.utils.log import Log
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.stream import Stream
from heronpy.api.component.component_spec import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader
from ..core import constants as integ_const
from .batch_bolt import BatchBolt
# pylint: disable=missing-docstring
class IntegrationTestBolt(Bolt):
"""Base bolt for integration test
Every bolt of integration test topology consists of this instance, each delegating user's bolt.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]
@classmethod
def spec(cls, name, par, inputs, config, user_bolt_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)
config[integ_const.USER_BOLT_CLASSPATH] = user_bolt_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=False, par=par,
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''half ack bolt'''
from heronpy.api.bolt.bolt import Bolt
class HalfAckBolt(Bolt):
"""Half of data tuples will be acked and the other half will be failed"""
# pylint: disable=unused-argument
def initialize(self, config, context):
self.total = 0
def process(self, tup):
self.total += 1
if self.total % 2 == 0:
self.logger.debug("Failing a tuple: %s" % str(tup))
self.fail(tup)
else:
self.logger.debug("Acking a tuple: %s" % str(tup))
self.ack(tup)
def process_tick(self, tup):
self.log("Got tick tuple!")
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''module for example bolt: Consume Bolt'''
from heronpy.api.bolt.bolt import Bolt
# pylint: disable=unused-argument
class ConsumeBolt(Bolt):
def initialize(self, config, context):
self.logger.info("In prepare() of ConsumerBolt")
self.total = 0
def process(self, tup):
self.total += 1
def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Total received data tuple: %d" % self.total)
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''window_bolt.py: API for defining windowed bolts in Heron'''
from abc import abstractmethod
from collections import namedtuple, deque
import time
from heronpy.api.bolt.bolt import Bolt
import heronpy.api.api_constants as api_constants
from heronpy.api.state.stateful_component import StatefulComponent
WindowContext = namedtuple('WindowContext', ('start', 'end'))
class SlidingWindowBolt(Bolt, StatefulComponent):
"""SlidingWindowBolt is a higer level bolt for Heron users who want to deal with
batches of tuples belonging to a certain time window. This bolt keeps track of
managing the window, adding/expiring tuples based on window configuration.
This way users will just have to deal with writing processWindow function
"""
WINDOW_DURATION_SECS = 'slidingwindowbolt_duration_secs'
WINDOW_SLIDEINTERVAL_SECS = 'slidingwindowbolt_slideinterval_secs'
# pylint: disable=attribute-defined-outside-init
def init_state(self, stateful_state):
self.saved_state = stateful_state
# pylint: disable=unused-argument
def pre_save(self, checkpoint_id):
self.saved_state['tuples'] = self.current_tuples
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""module for example bolt: CountBolt"""
from collections import Counter
import heronpy.api.global_metrics as global_metrics
from heronpy.api.bolt.bolt import Bolt
# pylint: disable=unused-argument
class CountBolt(Bolt):
"""CountBolt"""
# output field declarer
#outputs = ['word', 'count']
def initialize(self, config, context):
self.logger.info("In prepare() of CountBolt")
self.counter = Counter()
self.total = 0
self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by