Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_topology_config_protobuf(mcs, class_dict):
config = topology_pb2.Config()
conf_dict = class_dict['_topo_config']
for key, value in conf_dict.items():
if isinstance(value, str):
kvs = config.kvs.add()
kvs.key = key
kvs.value = value
kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
else:
# need to serialize
kvs = config.kvs.add()
kvs.key = key
kvs.serialized_value = default_serializer.serialize(value)
kvs.type = topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE")
return config
def _get_comp_config(self):
"""Returns component-specific Config protobuf message
It first adds ``topology.component.parallelism``, and is overriden by
a user-defined component-specific configuration, specified by spec().
"""
proto_config = topology_pb2.Config()
# first add parallelism
key = proto_config.kvs.add()
key.key = TOPOLOGY_COMPONENT_PARALLELISM
key.value = str(self.parallelism)
key.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
# iterate through self.custom_config
if self.custom_config is not None:
sanitized = self._sanitize_config(self.custom_config)
for key, value in sanitized.items():
if isinstance(value, str):
kvs = proto_config.kvs.add()
kvs.key = key
kvs.value = value
kvs.type = topology_pb2.ConfigValueType.Value("STRING_VALUE")
def _get_base_component(self):
"""Returns Component protobuf message"""
comp = topology_pb2.Component()
comp.name = self.name
comp.spec = topology_pb2.ComponentObjectSpec.Value("PYTHON_CLASS_NAME")
comp.class_name = self.python_class_path
comp.config.CopyFrom(self._get_comp_config())
return comp
def _get_base_component(self):
"""Returns Component protobuf message"""
comp = topology_pb2.Component()
comp.name = self.name
comp.spec = topology_pb2.ComponentObjectSpec.Value("PYTHON_CLASS_NAME")
comp.class_name = self.python_class_path
comp.config.CopyFrom(self._get_comp_config())
return comp
# Base class can't initialize protobuf
return
heron_options = TopologyType.get_heron_options_from_env()
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory")
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
topology_name = heron_options.get("cmdline.topology.name", classname)
topology_id = topology_name + str(uuid.uuid4())
# create protobuf
topology = topology_pb2.Topology()
topology.id = topology_id
topology.name = topology_name
topology.state = topology_pb2.TopologyState.Value(initial_state)
topology.topology_config.CopyFrom(TopologyType.get_topology_config_protobuf(class_dict))
TopologyType.add_bolts_and_spouts(topology, class_dict)
class_dict['topology_name'] = topology_name
class_dict['topology_id'] = topology_id
class_dict['protobuf_topology'] = topology
class_dict['topologydefn_tmpdir'] = tmp_directory
class_dict['heron_runtime_options'] = heron_options
def _get_stream_schema(fields):
"""Returns a StreamSchema protobuf message"""
stream_schema = topology_pb2.StreamSchema()
for field in fields:
key = stream_schema.keys.add()
key.key = field
key.type = topology_pb2.Type.Value("OBJECT")
return stream_schema
def _get_bolt(self):
"""Returns Bolt protobuf message"""
bolt = topology_pb2.Bolt()
bolt.comp.CopyFrom(self._get_base_component())
# Add streams
self._add_in_streams(bolt)
self._add_out_streams(bolt)
return bolt
# sanitize inputs and get a map Grouping>
input_dict = self._sanitize_inputs()
for global_streamid, gtype in input_dict.items():
in_stream = bolt.inputs.add()
in_stream.stream.CopyFrom(self._get_stream_id(global_streamid.component_id,
global_streamid.stream_id))
if isinstance(gtype, Grouping.FIELDS):
# it's a field grouping
in_stream.gtype = gtype.gtype
in_stream.grouping_fields.CopyFrom(self._get_stream_schema(gtype.fields))
elif isinstance(gtype, Grouping.CUSTOM):
# it's a custom grouping
in_stream.gtype = gtype.gtype
in_stream.custom_grouping_object = gtype.python_serialized
in_stream.type = topology_pb2.CustomGroupingObjectType.Value("PYTHON_OBJECT")
else:
in_stream.gtype = gtype