Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _calculate_inputs(self):
return {GlobalStreamId(self._left.get_name(), self._left._output) :
Grouping.SHUFFLE,
GlobalStreamId(self._right.get_name(), self._right._output) :
Grouping.SHUFFLE}
if self.inputs is None:
return
if isinstance(self.inputs, dict):
# inputs are dictionary, must be either Grouping> or
# Grouping>
for key, grouping in self.inputs.items():
if not Grouping.is_grouping_sane(grouping):
raise ValueError('A given grouping is not supported')
if isinstance(key, HeronComponentSpec):
# use default streamid
if key.name is None:
# should not happen as TopologyType metaclass sets name attribute
# before calling this method
raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name")
global_streamid = GlobalStreamId(key.name, Stream.DEFAULT_STREAM_ID)
ret[global_streamid] = grouping
elif isinstance(key, GlobalStreamId):
ret[key] = grouping
else:
raise ValueError("%s is not supported as a key to inputs" % str(key))
elif isinstance(self.inputs, (list, tuple)):
# inputs are lists, must be either a list of HeronComponentSpec or GlobalStreamId
# will use SHUFFLE grouping
for input_obj in self.inputs:
if isinstance(input_obj, HeronComponentSpec):
if input_obj.name is None:
# should not happen as TopologyType metaclass sets name attribute
# before calling this method
raise RuntimeError("In _sanitize_inputs(): HeronComponentSpec doesn't have a name")
global_streamid = GlobalStreamId(input_obj.name, Stream.DEFAULT_STREAM_ID)
ret[global_streamid] = Grouping.SHUFFLE
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.custom("heronpy.streamlet.impl.reducebywindowbolt.ReduceGrouping")}
def _calculate_inputs(self):
return {GlobalStreamId(self._parent.get_name(), self._parent._output) :
Grouping.SHUFFLE}