Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
half_ack_bolt = builder.add_bolt("half_ack_bolt", HalfAckBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE,
constants.TOPOLOGY_MAX_SPOUT_PENDING: 100000000,
constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 300}
builder.set_config(topology_config)
builder.build_and_submit()
from examples.src.python.bolt import HalfAckBolt
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
half_ack_bolt = builder.add_bolt("half_ack_bolt", HalfAckBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.ATLEAST_ONCE,
constants.TOPOLOGY_MAX_SPOUT_PENDING: 100000000,
constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 300}
builder.set_config(topology_config)
builder.build_and_submit()
from examples.src.python.bolt import StatefulCountBolt
# Topology is defined using a topology builder
# Refer to multi_stream_topology for defining a topology by subclassing Topology
# pylint: disable=superfluous-parens
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Topology's name is not specified")
sys.exit(1)
builder = TopologyBuilder(name=sys.argv[1])
word_spout = builder.add_spout("word_spout", StatefulWordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", StatefulCountBolt, par=2,
inputs={word_spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
topology_config = {constants.TOPOLOGY_RELIABILITY_MODE:
constants.TopologyReliabilityMode.EFFECTIVELY_ONCE,
constants.TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS: 30}
builder.set_config(topology_config)
builder.build_and_submit()
def get_nstmgrs(topology):
"""
The argument is the proto object for topology.
Returns the number of stream managers for the topology.
This is equal to the number of containers.
If not present, return 1 as default.
"""
return int(get_topology_config(topology, api_constants.TOPOLOGY_STMGRS) or 1)
def initialize(self, config, context):
"""We initialize the window duration and slide interval
"""
if SlidingWindowBolt.WINDOW_DURATION_SECS in config:
self.window_duration = int(config[SlidingWindowBolt.WINDOW_DURATION_SECS])
else:
self.logger.fatal("Window Duration has to be specified in the config")
if SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS in config:
self.slide_interval = int(config[SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS])
else:
self.slide_interval = self.window_duration
if self.slide_interval > self.window_duration:
self.logger.fatal("Slide Interval should be <= Window Duration")
# By modifying the config, we are able to setup the tick timer
config[api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS] = str(self.slide_interval)
self.current_tuples = deque()
if hasattr(self, 'saved_state'):
if 'tuples' in self.saved_state:
self.current_tuples = self.saved_state['tuples']
def get_serializer(context):
"""Returns a serializer for a given context"""
cluster_config = context.get_cluster_config()
serializer_clsname = cluster_config.get(constants.TOPOLOGY_SERIALIZER_CLASSNAME, None)
if serializer_clsname is None:
return PythonSerializer()
else:
try:
topo_pex_path = context.get_topology_pex_path()
pex_loader.load_pex(topo_pex_path)
serializer_cls = pex_loader.import_and_get_class(topo_pex_path, serializer_clsname)
serializer = serializer_cls()
return serializer
except Exception as e:
raise RuntimeError("Error with loading custom serializer class: %s, with error message: %s"
% (serializer_clsname, str(e)))
"""
import os
import uuid
import heronpy.api.api_constants as api_constants
import six
from heronpy.api.component.component_spec import HeronComponentSpec
from heronpy.api.serializer import default_serializer
from heronpy.proto import topology_pb2
class TopologyType(type):
"""Metaclass to define a Heron topology in Python"""
DEFAULT_TOPOLOGY_CONFIG = {
api_constants.TOPOLOGY_DEBUG: "false",
api_constants.TOPOLOGY_STMGRS: "1",
api_constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS: "30",
api_constants.TOPOLOGY_COMPONENT_PARALLELISM: "1",
api_constants.TOPOLOGY_MAX_SPOUT_PENDING: "100",
api_constants.TOPOLOGY_RELIABILITY_MODE: api_constants.TopologyReliabilityMode.ATMOST_ONCE,
api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: "true"}
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
def _prepare_tick_tup_timer(self):
cluster_config = self.pplan_helper.context.get_cluster_config()
if api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS in cluster_config:
tick_freq_sec = cluster_config[api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS]
Log.debug("Tick Tuple Frequency: %s sec." % str(tick_freq_sec))
def send_tick():
tick = TupleHelper.make_tick_tuple()
start_time = time.time()
self.bolt_impl.process_tick(tick)
tick_execute_latency_ns = (time.time() - start_time) * system_constants.SEC_TO_NS
self.bolt_metrics.execute_tuple(tick.id, tick.component, tick_execute_latency_ns)
self.output_helper.send_out_tuples()
self.looper.wake_up() # so emitted tuples would be added to buffer now
self._prepare_tick_tup_timer()
self.looper.register_timer_task_in_sec(send_tick, tick_freq_sec)