Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
mymap = {}
for tup in tuples:
userdata = tup.values[0]
if not isinstance(userdata, collections.Iterable) or len(userdata) != 2:
raise RuntimeError("Join tuples must be iterable of length 2")
self._add(userdata[0], userdata[1], tup.component, mymap)
for (key, values) in mymap.items():
if self._join_type == JoinBolt.INNER:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER_LEFT:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[0]:
self.outer_left_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER_RIGHT:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[1]:
self.outer_right_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[0]:
self.outer_left_join_and_emit(key, values, window_config)
elif values[1]:
self.outer_right_join_and_emit(key, values, window_config)
self._add(userdata[0], userdata[1], tup.component, mymap)
for (key, values) in mymap.items():
if self._join_type == JoinBolt.INNER:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER_LEFT:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[0]:
self.outer_left_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER_RIGHT:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[1]:
self.outer_right_join_and_emit(key, values, window_config)
elif self._join_type == JoinBolt.OUTER:
if values[0] and values[1]:
self.inner_join_and_emit(key, values, window_config)
elif values[0]:
self.outer_left_join_and_emit(key, values, window_config)
elif values[1]:
self.outer_right_join_and_emit(key, values, window_config)
def _build_this(self, builder, stage_names):
print("join_build_this left: %s right: %s" % (self._left._built, self._right._built))
print("left: %s right: %s" % (self._left.get_name(), self._right.get_name()))
if not self._left._built or not self._right._built:
return False
if not self.get_name():
self.set_name(self._default_stage_name_calculator("join", stage_names))
if self.get_name() in stage_names:
raise RuntimeError("Duplicate Names")
stage_names.add(self.get_name())
builder.add_bolt(self.get_name(), JoinBolt, par=self.get_num_partitions(),
inputs=self._calculate_inputs(),
config={JoinBolt.WINDOWDURATION : self._window_config._window_duration.seconds,
JoinBolt.SLIDEINTERVAL : self._window_config._slide_interval.seconds,
JoinBolt.JOINEDCOMPONENT : self._right.get_name(),
JoinBolt.JOINFUNCTION : self._join_function,
JoinBolt.JOINTYPE : self._join_type})
return True
def join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by joining join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.INNER, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result
def initialize(self, config, context):
super(JoinBolt, self).initialize(config, context)
if not JoinBolt.JOINEDCOMPONENT in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINEDCOMPONENT)
self._joined_component = config[JoinBolt.JOINEDCOMPONENT]
if not JoinBolt.JOINFUNCTION in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINFUNCTION)
self._join_function = config[JoinBolt.JOINFUNCTION]
if not JoinBolt.JOINTYPE in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINTYPE)
self._join_type = config[JoinBolt.JOINTYPE]
def initialize(self, config, context):
super(JoinBolt, self).initialize(config, context)
if not JoinBolt.JOINEDCOMPONENT in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINEDCOMPONENT)
self._joined_component = config[JoinBolt.JOINEDCOMPONENT]
if not JoinBolt.JOINFUNCTION in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINFUNCTION)
self._join_function = config[JoinBolt.JOINFUNCTION]
if not JoinBolt.JOINTYPE in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINTYPE)
self._join_type = config[JoinBolt.JOINTYPE]
def _build_this(self, builder, stage_names):
print("join_build_this left: %s right: %s" % (self._left._built, self._right._built))
print("left: %s right: %s" % (self._left.get_name(), self._right.get_name()))
if not self._left._built or not self._right._built:
return False
if not self.get_name():
self.set_name(self._default_stage_name_calculator("join", stage_names))
if self.get_name() in stage_names:
raise RuntimeError("Duplicate Names")
stage_names.add(self.get_name())
builder.add_bolt(self.get_name(), JoinBolt, par=self.get_num_partitions(),
inputs=self._calculate_inputs(),
config={JoinBolt.WINDOWDURATION : self._window_config._window_duration.seconds,
JoinBolt.SLIDEINTERVAL : self._window_config._slide_interval.seconds,
JoinBolt.JOINEDCOMPONENT : self._right.get_name(),
JoinBolt.JOINFUNCTION : self._join_function,
JoinBolt.JOINTYPE : self._join_type})
return True
def _build_this(self, builder, stage_names):
print("join_build_this left: %s right: %s" % (self._left._built, self._right._built))
print("left: %s right: %s" % (self._left.get_name(), self._right.get_name()))
if not self._left._built or not self._right._built:
return False
if not self.get_name():
self.set_name(self._default_stage_name_calculator("join", stage_names))
if self.get_name() in stage_names:
raise RuntimeError("Duplicate Names")
stage_names.add(self.get_name())
builder.add_bolt(self.get_name(), JoinBolt, par=self.get_num_partitions(),
inputs=self._calculate_inputs(),
config={JoinBolt.WINDOWDURATION : self._window_config._window_duration.seconds,
JoinBolt.SLIDEINTERVAL : self._window_config._slide_interval.seconds,
JoinBolt.JOINEDCOMPONENT : self._right.get_name(),
JoinBolt.JOINFUNCTION : self._join_function,
JoinBolt.JOINTYPE : self._join_type})
return True
def initialize(self, config, context):
super(JoinBolt, self).initialize(config, context)
if not JoinBolt.JOINEDCOMPONENT in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINEDCOMPONENT)
self._joined_component = config[JoinBolt.JOINEDCOMPONENT]
if not JoinBolt.JOINFUNCTION in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINFUNCTION)
self._join_function = config[JoinBolt.JOINFUNCTION]
if not JoinBolt.JOINTYPE in config:
raise RuntimeError("%s must be specified in the JoinBolt" % JoinBolt.JOINTYPE)
self._join_type = config[JoinBolt.JOINTYPE]
def outer_left_join(self, join_streamlet, window_config, join_function):
"""Return a new Streamlet by left join_streamlet with this streamlet
"""
from heronpy.streamlet.impl.joinbolt import JoinStreamlet, JoinBolt
join_streamlet_result = JoinStreamlet(JoinBolt.OUTER_LEFT, window_config,
join_function, self, join_streamlet)
self._add_child(join_streamlet_result)
join_streamlet._add_child(join_streamlet_result)
return join_streamlet_result