Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from ..core import constants as integ_const
from .aggregator_bolt import AggregatorBolt
from .integration_test_spout import IntegrationTestSpout
from .integration_test_bolt import IntegrationTestBolt
class TestTopologyBuilder(TopologyBuilder):
"""Topology Builder for integration tests
Given spouts and bolts will be delegated by IntegrationTestSpout and IntegrationTestBolt
classes respectively.
"""
TERMINAL_BOLT_NAME = '__integration_test_aggregator_bolt'
TERMINAL_BOLT_CLASS = AggregatorBolt
DEFAULT_CONFIG = {api_constants.TOPOLOGY_DEBUG: True,
api_constants.TOPOLOGY_RELIABILITY_MODE:
api_constants.TopologyReliabilityMode.ATLEAST_ONCE,
api_constants.TOPOLOGY_PROJECT_NAME: "heron-integration-test"}
def __init__(self, name, http_server_url):
super(TestTopologyBuilder, self).__init__(name)
self.output_location = "%s/%s" % (http_server_url, self.topology_name)
self.set_config(self.DEFAULT_CONFIG)
# map spout's component spec>
self.spouts = {}
# map bolt's component spec>
self.bolts = {}
# map set of parents>
self.prev = {}
def add_spout(self, name, spout_cls, par, config=None,
optional_outputs=None, max_executions=None):
"""Add an integration_test spout"""
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", StatefulWordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", StatefulCountBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.EFFECTIVELY_ONCE,
constants.TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS: 30}
builder.set_config(topology_config)
builder.build_and_submit()
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE}
builder.set_config(topology_config)
builder.build_and_submit()
def __init__(self, pplan_helper, in_stream, out_stream, looper):
self.pplan_helper = pplan_helper
self.in_stream = in_stream
self.output_helper = OutgoingTupleHelper(self.pplan_helper, out_stream)
self.looper = looper
self.sys_config = system_config.get_sys_config()
# will set a root logger here
self.logger = logging.getLogger()
context = pplan_helper.context
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.is_stateful = bool(mode == api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE)
self._stateful_state = None
self.serializer = SerializerHelper.get_serializer(pplan_helper.context)
self._initialized_global_metrics = False
def __init__(self, pplan_helper, in_stream, out_stream, looper):
self.pplan_helper = pplan_helper
self.in_stream = in_stream
self.output_helper = OutgoingTupleHelper(self.pplan_helper, out_stream)
self.looper = looper
self.sys_config = system_config.get_sys_config()
# will set a root logger here
self.logger = logging.getLogger()
context = pplan_helper.context
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.is_stateful = bool(mode == api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE)
self._stateful_state = None
self.serializer = SerializerHelper.get_serializer(pplan_helper.context)
self._initialized_global_metrics = False
def set_delivery_semantics(self, semantics):
if semantics == Config.ATMOST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATMOST_ONCE
elif semantics == Config.ATLEAST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATLEAST_ONCE
elif semantics == Config.EFFECTIVELY_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE
else:
raise RuntimeError("Unknown Topology delivery semantics %s" % str(semantics))
def __init__(self, pplan_helper, in_stream, out_stream, looper):
super(BoltInstance, self).__init__(pplan_helper, in_stream, out_stream, looper)
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
if self.pplan_helper.is_spout:
raise RuntimeError("No bolt in physical plan")
# bolt_config is auto-typed, not str> only
context = self.pplan_helper.context
self.bolt_metrics = BoltMetrics(self.pplan_helper)
# acking related
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.acking_enabled = bool(mode == api_constants.TopologyReliabilityMode.ATLEAST_ONCE)
self._initialized_metrics_and_tasks = False
Log.info("Enable ACK: %s" % str(self.acking_enabled))
# load user's bolt class
bolt_impl_class = super(BoltInstance, self).load_py_instance(is_spout=False)
self.bolt_impl = bolt_impl_class(delegate=self)
def __init__(self, pplan_helper, in_stream, out_stream, looper):
super(BoltInstance, self).__init__(pplan_helper, in_stream, out_stream, looper)
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
if self.pplan_helper.is_spout:
raise RuntimeError("No bolt in physical plan")
# bolt_config is auto-typed, not str> only
context = self.pplan_helper.context
self.bolt_metrics = BoltMetrics(self.pplan_helper)
# acking related
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.acking_enabled = bool(mode == api_constants.TopologyReliabilityMode.ATLEAST_ONCE)
self._initialized_metrics_and_tasks = False
Log.info("Enable ACK: %s" % str(self.acking_enabled))
# load user's bolt class
bolt_impl_class = super(BoltInstance, self).load_py_instance(is_spout=False)
self.bolt_impl = bolt_impl_class(delegate=self)
def set_delivery_semantics(self, semantics):
if semantics == Config.ATMOST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATMOST_ONCE
elif semantics == Config.ATLEAST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATLEAST_ONCE
elif semantics == Config.EFFECTIVELY_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE
else:
raise RuntimeError("Unknown Topology delivery semantics %s" % str(semantics))