Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from heronpy.api.topology import TopologyBuilder, Topology, TopologyType
from ..core import constants as integ_const
from .aggregator_bolt import AggregatorBolt
from .integration_test_spout import IntegrationTestSpout
from .integration_test_bolt import IntegrationTestBolt
class TestTopologyBuilder(TopologyBuilder):
"""Topology Builder for integration tests
Given spouts and bolts will be delegated by IntegrationTestSpout and IntegrationTestBolt
classes respectively.
"""
TERMINAL_BOLT_NAME = '__integration_test_aggregator_bolt'
TERMINAL_BOLT_CLASS = AggregatorBolt
DEFAULT_CONFIG = {api_constants.TOPOLOGY_DEBUG: True,
api_constants.TOPOLOGY_RELIABILITY_MODE:
api_constants.TopologyReliabilityMode.ATLEAST_ONCE,
api_constants.TOPOLOGY_PROJECT_NAME: "heron-integration-test"}
def __init__(self, name, http_server_url):
super(TestTopologyBuilder, self).__init__(name)
self.output_location = "%s/%s" % (http_server_url, self.topology_name)
self.set_config(self.DEFAULT_CONFIG)
# map spout's component spec>
self.spouts = {}
# map bolt's component spec>
self.bolts = {}
# map set of parents>
self.prev = {}
def add_spout(self, name, spout_cls, par, config=None,
optional_outputs=None, max_executions=None):
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
half_ack_bolt = builder.add_bolt("half_ack_bolt", HalfAckBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE,
constants.TOPOLOGY_MAX_SPOUT_PENDING: 100000000,
constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 300}
builder.set_config(topology_config)
builder.build_and_submit()
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", StatefulWordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", StatefulCountBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.EFFECTIVELY_ONCE,
constants.TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS: 30}
builder.set_config(topology_config)
builder.build_and_submit()
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", WindowSizeBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={SlidingWindowBolt.WINDOW_DURATION_SECS: 10,
SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS: 2})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE}
builder.set_config(topology_config)
builder.build_and_submit()
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE}
builder.set_config(topology_config)
builder.build_and_submit()
def __init__(self, pplan_helper, in_stream, out_stream, looper):
self.pplan_helper = pplan_helper
self.in_stream = in_stream
self.output_helper = OutgoingTupleHelper(self.pplan_helper, out_stream)
self.looper = looper
self.sys_config = system_config.get_sys_config()
# will set a root logger here
self.logger = logging.getLogger()
context = pplan_helper.context
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.is_stateful = bool(mode == api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE)
self._stateful_state = None
self.serializer = SerializerHelper.get_serializer(pplan_helper.context)
self._initialized_global_metrics = False
import heronpy.api.api_constants as api_constants
import six
from heronpy.api.component.component_spec import HeronComponentSpec
from heronpy.api.serializer import default_serializer
from heronpy.proto import topology_pb2
class TopologyType(type):
"""Metaclass to define a Heron topology in Python"""
DEFAULT_TOPOLOGY_CONFIG = {
api_constants.TOPOLOGY_DEBUG: "false",
api_constants.TOPOLOGY_STMGRS: "1",
api_constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: "30",
api_constants.TOPOLOGY_COMPONENT_PARALLELISM: "1",
api_constants.TOPOLOGY_MAX_SPOUT_PENDING: "100",
api_constants.TOPOLOGY_RELIABILITY_MODE: api_constants.TopologyReliabilityMode.ATMOST_ONCE,
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: "true"}
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
def __init__(self, pplan_helper, in_stream, out_stream, looper):
super(SpoutInstance, self).__init__(pplan_helper, in_stream, out_stream, looper)
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
if not self.pplan_helper.is_spout:
raise RuntimeError("No spout in physicial plan")
context = self.pplan_helper.context
self.spout_metrics = SpoutMetrics(self.pplan_helper)
# acking related
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.acking_enabled = bool(mode == api_constants.TopologyReliabilityMode.ATLEAST_ONCE)
self.enable_message_timeouts = \
context.get_cluster_config().get(api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)
self._initialized_metrics_and_tasks = False
Log.info("Enable ACK: %s" % str(self.acking_enabled))
Log.info("Enable Message Timeouts: %s" % str(self.enable_message_timeouts))
# map tuple_info>, ordered by insertion time
self.in_flight_tuples = collections.OrderedDict()
self.immediate_acks = collections.deque()
self.total_tuples_emitted = 0
# load user's spout class
spout_impl_class = super(SpoutInstance, self).load_py_instance(is_spout=True)
self.spout_impl = spout_impl_class(delegate=self)
def __init__(self, pplan_helper, in_stream, out_stream, looper):
super(BoltInstance, self).__init__(pplan_helper, in_stream, out_stream, looper)
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
if self.pplan_helper.is_spout:
raise RuntimeError("No bolt in physical plan")
# bolt_config is auto-typed, not str> only
context = self.pplan_helper.context
self.bolt_metrics = BoltMetrics(self.pplan_helper)
# acking related
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.acking_enabled = bool(mode == api_constants.TopologyReliabilityMode.ATLEAST_ONCE)
self._initialized_metrics_and_tasks = False
Log.info("Enable ACK: %s" % str(self.acking_enabled))
# load user's bolt class
bolt_impl_class = super(BoltInstance, self).load_py_instance(is_spout=False)
self.bolt_impl = bolt_impl_class(delegate=self)
def set_delivery_semantics(self, semantics):
if semantics == Config.ATMOST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATMOST_ONCE
elif semantics == Config.ATLEAST_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.ATLEAST_ONCE
elif semantics == Config.EFFECTIVELY_ONCE:
self._api_config[api_constants.TOPOLOGY_RELIABILITY_MODE] =\
api_constants.TopologyReliabilityMode.EFFECTIVELY_ONCE
else:
raise RuntimeError("Unknown Topology delivery semantics %s" % str(semantics))