Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _add_node(self, config, futures, name, datum):
if config.nodes.get(name) is not None:
self.log.warning(self._W_EXISTS, config.name, "node", name)
return False
pkg, exe = datum["node_type"].split("/", 1)
args = datum.get("args", "")
remaps = datum.get("remaps", {})
node = self.find_node(pkg, exe, args)
rosname = RosName(name)
self.log.debug("[hints] Creating NodeInstance %s for Node %s.",
rosname.full, node.name)
instance = NodeInstance(self.configuration, rosname, node,
argv=args, remaps=remaps)
instance.location2 = JSON_to_loc2(datum.get("traceability"))
node.instances.append(instance)
previous = self.configuration.nodes.add(instance)
fnl = FutureNodeLinks(instance)
fnl.advertise.extend(datum.get("publishers", ()))
fnl.subscribe.extend(datum.get("subscribers", ()))
fnl.service.extend(datum.get("servers", ()))
fnl.client.extend(datum.get("clients", ()))
fnl.read_param.extend(datum.get("getters", ()))
fnl.write_param.extend(datum.get("setters", ()))
futures.append(fnl)
return True
create.append(call)
else:
if not matches:
self.log.warning("No matches: %s", shint)
continue
if len(matches) > 1:
self.log.warning("Many matches: %s", shint)
continue
call = matches[0]
if "original_name" in shint:
call.name = shint["original_name"] or "/?"
call.namespace = "/"
if "srv_type" in shint:
call.type = shint["srv_type"]
if "traceability" in shint:
call.location2 = JSON_to_loc2(shint["traceability"])
if "conditional" in shint:
v = _bool_to_conditions(not shint["conditional"])
call.conditions = v
calls.extend(create)
def _add_param(self, config, name, datum):
if config.parameters.get(name) is not None:
self.log.warning(self._W_EXISTS, config.name, "param", name)
return False
ptype = datum["param_type"]
value = datum.get("default_value")
param = Parameter(config, RosName(name), ptype, value)
param.location2 = JSON_to_loc2(datum.get("traceability"))
config.parameters.add(param)
return True
new_name = datum["rosname"] or "/?"
if config.nodes.get(new_name) is not None:
self.log.warning(self._W_COLLIDES, config.name, "node", new_name)
return False
config.nodes.remove(name)
node.rosname = RosName(new_name)
config.nodes.add(node)
if "node_type" in datum:
self.log.warning(self._W_UNSUPPORTED, config.name,
"node", "node_type")
if "args" in datum:
node.argv = datum["args"]
if "remaps" in datum:
node.remaps = dict(datum["remaps"])
if "traceability" in datum:
node.location2 = JSON_to_loc2(datum["traceability"])
if "conditional" in datum:
v = _bool_to_conditions(not datum["conditional"])
node.conditions = v
# BY FIRE BE PURGED ---------------------------------------------------
for fnl in futures:
if fnl.node is not node:
continue
fnl.advertise.extend(datum.get("publishers", ()))
fnl.subscribe.extend(datum.get("subscribers", ()))
fnl.service.extend(datum.get("servers", ()))
fnl.client.extend(datum.get("clients", ()))
fnl.read_param.extend(datum.get("getters", ()))
fnl.write_param.extend(datum.get("setters", ()))
# ---------------------------------------------------------------------
return True
create.append(call)
else:
if not matches:
self.log.warning("No matches: %s", shint)
continue
if len(matches) > 1:
self.log.warning("Many matches: %s", shint)
continue
call = matches[0]
if "original_name" in shint:
call.name = shint["original_name"] or "/?"
call.namespace = "/"
if "srv_type" in shint:
call.type = shint["srv_type"]
if "traceability" in shint:
call.location2 = JSON_to_loc2(shint["traceability"])
if "conditional" in shint:
v = _bool_to_conditions(not shint["conditional"])
call.conditions = v
calls.extend(create)
create = []
config = self.node.configuration
for shint in self.advertise:
g = shint["topic"]
matches = []
for call in calls:
ns = RosName.resolve_ns(call.namespace,
ns=self.ns, private_ns=self.pns)
rosname = RosName(call.name, ns=ns, private_ns=self.pns,
remaps=self.node.remaps)
if g == rosname.full:
matches.append(call)
if shint.get("create", False):
call = AdvertiseCall(g, "/", shint["msg_type"],
shint["queue_size"], latched=shint.get("latched", False))
call.location2 = JSON_to_loc2(shint.get("traceability"))
create.append(call)
else:
if not matches:
self.log.warning("No matches: %s", shint)
continue
if len(matches) > 1:
self.log.warning("Many matches: %s", shint)
continue
call = matches[0]
if "original_name" in shint:
call.name = shint["original_name"] or "/?"
call.namespace = "/"
if "msg_type" in shint:
call.type = shint["msg_type"]
if "queue_size" in shint:
call.queue_size = shint["queue_size"]
if not matches:
self.log.warning("No matches: %s", shint)
continue
if len(matches) > 1:
self.log.warning("Many matches: %s", shint)
continue
call = matches[0]
if "original_name" in shint:
call.name = shint["original_name"] or "/?"
call.namespace = "/"
if "param_type" in shint:
call.type = shint["param_type"]
if "value" in shint:
call.value = shint["value"]
if "traceability" in shint:
call.location2 = JSON_to_loc2(shint["traceability"])
if "conditional" in shint:
v = _bool_to_conditions(not shint["conditional"])
call.conditions = v
calls.extend(create)
# -------------------------------------------------------------------------
if not matches:
self.log.warning("No matches: %s", shint)
continue
if len(matches) > 1:
self.log.warning("Many matches: %s", shint)
continue
call = matches[0]
if "original_name" in shint:
call.name = shint["original_name"] or "/?"
call.namespace = "/"
if "msg_type" in shint:
call.type = shint["msg_type"]
if "queue_size" in shint:
call.queue_size = shint["queue_size"]
if "traceability" in shint:
call.location2 = JSON_to_loc2(shint["traceability"])
if "conditional" in shint:
v = _bool_to_conditions(not shint["conditional"])
call.conditions = v
calls.extend(create)