Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# terminal_outputs are output fields for terminals, list of either str or Stream
for terminal in terminals:
if terminal in self.bolts:
terminal_outputs = self.bolts[terminal].outputs
else:
terminal_outputs = self.spouts[terminal].outputs
# now get a set of stream ids
stream_ids = ["default" if isinstance(out, str) else out.stream_id
for out in terminal_outputs]
for stream_id in set(stream_ids):
self._add_all_grouping(self.TERMINAL_BOLT_NAME, terminal, stream_id)
# create topology class
class_dict = self._construct_topo_class_dict()
return TopologyType(self.topology_name, (Topology,), class_dict)
def build_and_submit(self):
"""Builds the topology and submits to the destination"""
class_dict = self._construct_topo_class_dict()
topo_cls = TopologyType(self.topology_name, (Topology,), class_dict)
topo_cls.write()
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
# create topology protobuf here
TopologyType.init_topology(classname, class_dict)
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory")
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
topology_name = heron_options.get("cmdline.topology.name", classname)
topology_id = topology_name + str(uuid.uuid4())
# create protobuf
topology = topology_pb2.Topology()
topology.id = topology_id
topology.name = topology_name
topology.state = topology_pb2.TopologyState.Value(initial_state)
topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict))
TopologyType.add_bolts_and_spouts(topology, class_dict)
class_dict['topology_name'] = topology_name
class_dict['topology_id'] = topology_id
class_dict['protobuf_topology'] = topology
class_dict['topologydefn_tmpdir'] = tmp_directory
class_dict['heron_runtime_options'] = heron_options
return
heron_options = TopologyType.get_heron_options_from_env()
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory")
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
topology_name = heron_options.get("cmdline.topology.name", classname)
topology_id = topology_name + str(uuid.uuid4())
# create protobuf
topology = topology_pb2.Topology()
topology.id = topology_id
topology.name = topology_name
topology.state = topology_pb2.TopologyState.Value(initial_state)
topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict))
TopologyType.add_bolts_and_spouts(topology, class_dict)
class_dict['topology_name'] = topology_name
class_dict['topology_id'] = topology_id
class_dict['protobuf_topology'] = topology
class_dict['topologydefn_tmpdir'] = tmp_directory
class_dict['heron_runtime_options'] = heron_options
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
# create topology protobuf here
TopologyType.init_topology(classname, class_dict)
return type.__new__(mcs, classname, bases, class_dict)
def init_topology(mcs, classname, class_dict):
"""Initializes a topology protobuf"""
if classname == 'Topology':
# Base class can't initialize protobuf
return
heron_options = TopologyType.get_heron_options_from_env()
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory")
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
topology_name = heron_options.get("cmdline.topology.name", classname)
topology_id = topology_name + str(uuid.uuid4())
# create protobuf
topology = topology_pb2.Topology()
topology.id = topology_id
topology.name = topology_name
topology.state = topology_pb2.TopologyState.Value(initial_state)
topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict))
TopologyType.add_bolts_and_spouts(topology, class_dict)
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
# create topology protobuf here
TopologyType.init_topology(classname, class_dict)
return type.__new__(mcs, classname, bases, class_dict)
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy HeronComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
for spec in iter(specs.values()):
if spec.is_spout:
TopologyType.add_spout_specs(spec, spout_specs)
else:
TopologyType.add_bolt_specs(spec, bolt_specs)
if classname != 'Topology' and not spout_specs:
raise ValueError("A Topology requires at least one Spout")
topology_config = TopologyType.class_dict_to_topo_config(class_dict)
if classname != 'Topology':
class_dict['_topo_config'] = topology_config
class_dict['_protobuf_bolts'] = bolt_specs
class_dict['_protobuf_spouts'] = spout_specs
class_dict['_heron_specs'] = list(specs.values())
# create topology protobuf here
TopologyType.init_topology(classname, class_dict)
return type.__new__(mcs, classname, bases, class_dict)