Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _add_node(self, config, futures, name, datum):
if config.nodes.get(name) is not None:
self.log.warning(self._W_EXISTS, config.name, "node", name)
return False
pkg, exe = datum["node_type"].split("/", 1)
args = datum.get("args", "")
remaps = datum.get("remaps", {})
node = self.find_node(pkg, exe, args)
rosname = RosName(name)
self.log.debug("[hints] Creating NodeInstance %s for Node %s.",
rosname.full, node.name)
instance = NodeInstance(self.configuration, rosname, node,
argv=args, remaps=remaps)
instance.location2 = JSON_to_loc2(datum.get("traceability"))
node.instances.append(instance)
previous = self.configuration.nodes.add(instance)
fnl = FutureNodeLinks(instance)
fnl.advertise.extend(datum.get("publishers", ()))
fnl.subscribe.extend(datum.get("subscribers", ()))
fnl.service.extend(datum.get("servers", ()))
fnl.client.extend(datum.get("clients", ()))
fnl.read_param.extend(datum.get("getters", ()))
fnl.write_param.extend(datum.get("setters", ()))
futures.append(fnl)
return True
def __init__(self, name, ns = "/", private_ns = "", remaps = None):
self._given = name
self._name = RosName.transform(name, ns = ns, private_ns = private_ns,
remaps = remaps)
parts = self._name.rsplit("/", 1)
self._own = parts[-1]
self._ns = parts[0] or "/"
conditions=list(self.conditions))
if instance._location is not None:
instance._location.line = line
instance._location.column = col
node.instances.append(instance)
if not condition is True:
instance.conditions.append(condition)
previous = self.configuration.nodes.add(instance)
new_scope = LaunchScope(self, self.configuration, self.launch_file,
ns=ns, node=instance, remaps=instance.remaps,
params=self.parameters, args=self.arguments,
conditions=instance.conditions)
self.children.append(new_scope)
pns = new_scope.private_ns
for param in self._params:
rosname = RosName(param.rosname.given, pns, pns)
conditions = param.conditions + instance.conditions
self.log.debug("Creating new forward Parameter %s.", rosname.full)
new_param = Parameter(self.configuration, rosname, param.type,
param.value, node_scope=param.node_scope,
launch=param.launch_file, conditions=conditions)
new_param._location = param._location
self.parameters.append(new_param)
return new_scope
def test_rosname():
n1 = RosName("a")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "/a"
assert "/a" == n1
assert n2 == "ns/a"
assert "ns/a" == n2
n1 = RosName("a", "ns")
assert n1 == n2
n1 = RosName("a")
n2 = RosName("a")
assert n1 == n2
n1 = RosName("~a", "ns", "priv")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "priv/a"
n = Node("base", Package("pkg"), rosname = RosName("base"))
assert n.rosname == "/base"
def make_node(self, node, name, ns, args, condition, line=None, col=None):
ns = self._namespace(ns)
name = name or node.rosname.own
rosname = RosName(name, ns, self.private_ns)
self.log.debug("Creating NodeInstance %s for Node %s.",
rosname.full, node.name)
instance = NodeInstance(self.configuration, rosname, node,
launch=self.launch_file, argv = args, remaps=dict(self.remaps),
conditions=list(self.conditions))
if instance._location is not None:
instance._location.line = line
instance._location.column = col
node.instances.append(instance)
if not condition is True:
instance.conditions.append(condition)
previous = self.configuration.nodes.add(instance)
new_scope = LaunchScope(self, self.configuration, self.launch_file,
ns=ns, node=instance, remaps=instance.remaps,
params=self.parameters, args=self.arguments,
conditions=instance.conditions)
def test_rosname():
n1 = RosName("a")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "/a"
assert "/a" == n1
assert n2 == "ns/a"
assert "ns/a" == n2
n1 = RosName("a", "ns")
assert n1 == n2
n1 = RosName("a")
n2 = RosName("a")
assert n1 == n2
n1 = RosName("~a", "ns", "priv")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "priv/a"
n = Node("base", Package("pkg"), rosname = RosName("base"))
def _link_from_call(self, call, link_cls, resource_cls, collection):
ns = RosName.resolve_ns(call.namespace, ns=self.ns, private_ns=self.pns)
call_name = RosName(call.name, ns=ns, private_ns=self.pns)
rosname = RosName(call.name, ns=ns, private_ns=self.pns,
remaps=self.node.remaps)
self.log.debug("Creating %s link for %s (%s).",
resource_cls.__name__, call_name.full, rosname.full)
resource = collection.get(rosname.full)
if resource is not None:
self.log.debug("Found %s '%s' within collection.",
resource_cls.__name__, resource.id)
else:
self.log.debug("No %s named '%s' was found. Creating new Resource.",
resource_cls.__name__, rosname.full)
resource = self._new_resource(rosname, call, resource_cls)
collection.add(resource)
resource.variables.extend(uv.name for uv in call.variables())
link = link_cls.link_from_call(self.node, resource, call_name, call)
return link