Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.custom(RepartitionCustomGrouping(self._repartition_function))}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.custom("heronpy.streamlet.impl.reducebywindowbolt.ReduceGrouping")}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.custom("heronpy.streamlet.impl.reducebykeyandwindowbolt.ReduceGrouping")}
def prepare(self, context, component, stream, target_tasks):
logging.getLogger().info("In prepare of SampleCustomGrouping, "
"with src component: %s, "
"with stream id: %s, "
"with target tasks: %s"
, component, stream, str(target_tasks))
self.target_tasks = target_tasks
def choose_tasks(self, values):
# only emits to the first task id
return [self.target_tasks[0]]
class CustomGrouping(Topology):
word_spout = WordSpout.spec(par=1)
consume_bolt = ConsumeBolt.spec(par=3,
inputs={word_spout: Grouping.custom(SampleCustomGrouping())},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})