Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def emit(self, tup, stream=Stream.DEFAULT_STREAM_ID, anchors=None,
direct_task=None, need_task_ids=False):
Log.info("emitting tuple: %s", tup)
if tup is None:
super(IntegrationTestBolt, self).emit(list(self.current_tuple_processing),
stream=stream, anchors=anchors,
direct_task=direct_task, need_task_ids=need_task_ids)
else:
super(IntegrationTestBolt, self).emit(tup, stream, anchors, direct_task, need_task_ids)
"""Base spout for integration tests"""
import copy
from heron.common.src.python.utils.log import Log
from heronpy.api.spout.spout import Spout
from heronpy.api.stream import Stream
from heronpy.api.component.component_spec import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader
from ..core import constants as integ_const
class IntegrationTestSpout(Spout):
"""Base spout for integration test
Every spout of integration test topology consists of this instance, each delegating user's spout.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]
@classmethod
def spec(cls, name, par, config, user_spout_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)
config[integ_const.USER_SPOUT_CLASSPATH] = user_spout_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=True, par=par,
inputs=None, outputs=_outputs, config=config)
def initialize(self, config, context):
user_spout_classpath = config.get(integ_const.USER_SPOUT_CLASSPATH, None)
from heron.common.src.python.utils.log import Log
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.stream import Stream
from heronpy.api.component.component_spec import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader
from ..core import constants as integ_const
from .batch_bolt import BatchBolt
# pylint: disable=missing-docstring
class IntegrationTestBolt(Bolt):
"""Base bolt for integration test
Every bolt of integration test topology consists of this instance, each delegating user's bolt.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]
@classmethod
def spec(cls, name, par, inputs, config, user_bolt_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)
config[integ_const.USER_BOLT_CLASSPATH] = user_bolt_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=False, par=par,
inputs=inputs, outputs=_outputs, config=config)
def initialize(self, config, context):
user_bolt_classpath = config.get(integ_const.USER_BOLT_CLASSPATH, None)
if user_bolt_classpath is None:
def emit(self, tup, stream=Stream.DEFAULT_STREAM_ID,
anchors=None, direct_task=None, need_task_ids=False):
"""Emits a new tuple from this Bolt
It is compatible with StreamParse API.
:type tup: list or tuple
:param tup: the new output Tuple to send from this bolt,
should only contain only serializable data.
:type stream: str
:param stream: the ID of the stream to emit this Tuple to.
Leave empty to emit to the default stream.
:type anchors: list
:param anchors: a list of HeronTuples to which the emitted Tuples should be anchored.
:type direct_task: int
:param direct_task: the task to send the Tupel to if performing a direct emit.
:type need_task_ids: bool
def emit(self, tup, tup_id=None, stream=Stream.DEFAULT_STREAM_ID,
direct_task=None, need_task_ids=False):
"""Emits a new tuple from this Spout
It is compatible with StreamParse API.
:type tup: list or tuple
:param tup: the new output Tuple to send from this spout,
should contain only serializable data.
:type tup_id: str or object
:param tup_id: the ID for the Tuple. Leave this blank for an unreliable emit.
(Same as messageId in Java)
:type stream: str
:param stream: the ID of the stream this Tuple should be emitted to.
Leave empty to emit to the default stream.
:type direct_task: int
:param direct_task: the task to send the Tuple to if performing a direct emit.
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""module for base streamlet API spout: StreamletSpoutBase"""
from heronpy.api.stream import Stream
class StreamletSpoutBase(object):
"""StreamletSpoutBase"""
# output declarer
outputs = [Stream(fields=['_output_'], name='output')]
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""module for base streamlet bolt: StreamletBoltBase"""
from heronpy.api.stream import Stream
class StreamletBoltBase(object):
"""StreamletBoltBase"""
# output declarer
outputs = [Stream(fields=['_output_'], name='output')]
def _sanitize_outputs(self):
"""Sanitizes output fields and returns a map list of output fields>"""
ret = {}
if self.outputs is None:
return
if not isinstance(self.outputs, (list, tuple)):
raise TypeError("Argument to outputs must be either list or tuple, given: %s"
% str(type(self.outputs)))
for output in self.outputs:
if not isinstance(output, (str, Stream)):
raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output))
if isinstance(output, str):
# it's a default stream
if Stream.DEFAULT_STREAM_ID not in ret:
ret[Stream.DEFAULT_STREAM_ID] = list()
ret[Stream.DEFAULT_STREAM_ID].append(output)
else:
# output is a Stream object
if output.stream_id == Stream.DEFAULT_STREAM_ID and Stream.DEFAULT_STREAM_ID in ret:
# some default stream fields are already in there
ret[Stream.DEFAULT_STREAM_ID].extend(output.fields)
else:
ret[output.stream_id] = output.fields
return ret
"""Sanitizes output fields and returns a map list of output fields>"""
ret = {}
if self.outputs is None:
return
if not isinstance(self.outputs, (list, tuple)):
raise TypeError("Argument to outputs must be either list or tuple, given: %s"
% str(type(self.outputs)))
for output in self.outputs:
if not isinstance(output, (str, Stream)):
raise TypeError("Outputs must be a list of strings or Streams, given: %s" % str(output))
if isinstance(output, str):
# it's a default stream
if Stream.DEFAULT_STREAM_ID not in ret:
ret[Stream.DEFAULT_STREAM_ID] = list()
ret[Stream.DEFAULT_STREAM_ID].append(output)
else:
# output is a Stream object
if output.stream_id == Stream.DEFAULT_STREAM_ID and Stream.DEFAULT_STREAM_ID in ret:
# some default stream fields are already in there
ret[Stream.DEFAULT_STREAM_ID].extend(output.fields)
else:
ret[output.stream_id] = output.fields
return ret