Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def one_spout_bolt_multi_tasks_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 3)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=3,
optional_outputs=['word'])
return builder.create_topology()
def one_spout_multi_tasks_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 3)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=1,
optional_outputs=['word'])
return builder.create_topology()
def shuffle_grouping_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=3,
optional_outputs=['word'])
return builder.create_topology()
def bolt_double_emit_tuples_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
builder.add_bolt("double-tuples-bolt", DoubleTuplesBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=1, optional_outputs=['word'])
return builder.create_topology()
def basic_one_task_builder(topology_name, http_server_url):
"""Integration test topology builder for basic one task"""
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=1,
optional_outputs=['word'])
return builder.create_topology()
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
ret[key] = grouping
else:
raise ValueError("%s is not supported as a key to inputs" % str(key))
elif isinstance(self.inputs, (list, tuple)):
# inputs are lists, must be either a list of HeronComponentSpec or GlobalStreamId
# will use SHUFFLE grouping
for input_obj in self.inputs:
if isinstance(input_obj, HeronComponentSpec):
if input_obj.name is None:
# should not happen as TopologyType metaclass sets name attribute
# before calling this method
raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name")
global_streamid = GlobalStreamId(input_obj.name, Stream.DEFAULT_STREAM_ID)
ret[global_streamid] = Grouping.SHUFFLE
elif isinstance(input_obj, GlobalStreamId):
ret[input_obj] = Grouping.SHUFFLE
else:
raise ValueError("%s is not supported as an input" % str(input_obj))
else:
raise TypeError("Inputs must be a list, dict, or None, given: %s" % str(self.inputs))
return ret
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}