Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param stream_id: stream id
"""
# child has to be a bolt
child_component_spec = self.bolts[child]
# child_inputs is dict mapping from grouping>
child_inputs = child_component_spec.inputs
if parent in self.bolts:
parent_component_spec = self.bolts[parent]
else:
parent_component_spec = self.spouts[parent]
if stream_id == Stream.DEFAULT_STREAM_ID:
child_inputs[parent_component_spec] = Grouping.ALL
else:
child_inputs[parent_component_spec[stream_id]] = Grouping.ALL
def none_grouping_builder(topology_name, http_server_url):
"""Integration test topology builder for none grouping"""
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={ab_spout: Grouping.NONE},
par=3,
optional_outputs=['word'])
return builder.create_topology()
def multi_spouts_multi_tasks_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
spout_1 = builder.add_spout("ab-spout-1", ABSpout, 3)
spout_2 = builder.add_spout("ab-spout-2", ABSpout, 3)
builder.add_bolt("identity-bolt", IdentityBolt,
inputs={spout_1: Grouping.SHUFFLE,
spout_2: Grouping.SHUFFLE},
par=1,
optional_outputs=['word'])
return builder.create_topology()
def one_spout_two_bolts_builder(topology_name, http_server_url):
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
builder.add_bolt("identity-bolt-1", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=1,
optional_outputs=['word'])
builder.add_bolt("identity-bolt-2", IdentityBolt,
inputs={ab_spout: Grouping.SHUFFLE},
par=1,
optional_outputs=['word'])
return builder.create_topology()
def _add_in_streams(self, bolt):
"""Adds inputs to a given protobuf Bolt message"""
if self.inputs is None:
return
# sanitize inputs and get a map Grouping>
input_dict = self._sanitize_inputs()
for global_streamid, gtype in input_dict.items():
in_stream = bolt.inputs.add()
in_stream.stream.CopyFrom(self._get_stream_id(global_streamid.component_id,
global_streamid.stream_id))
if isinstance(gtype, Grouping.FIELDS):
# it's a field grouping
in_stream.gtype = gtype.gtype
in_stream.grouping_fields.CopyFrom(self._get_stream_schema(gtype.fields))
elif isinstance(gtype, Grouping.CUSTOM):
# it's a custom grouping
in_stream.gtype = gtype.gtype
in_stream.custom_grouping_object = gtype.python_serialized
in_stream.type = topology_pb2.CustomGroupingObjectType.Value("PYTHON_OBJECT")
else:
in_stream.gtype = gtype
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}