Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _namespace(self, ns, private = False):
pns = self.private_ns
if not ns:
return self.namespace if not private else pns
if private:
return RosName.resolve(ns, pns, private_ns = pns)
return RosName.resolve(ns, self.namespace)
continue
topic = RosName.resolve(event.topic, private_ns=pns)
if topic not in topic_types:
raise HplTypeError(
"No type information for topic '{}'".format(topic))
rostype = topic_types[topic]
if event.alias:
assert event.alias not in refs, "duplicate event alias"
assert rostype.is_message, "topic type is not a message"
# update refs without worries about temporal order of events
# prop.sanity_check() already checks that
refs[event.alias] = rostype
for event in events:
if event is None or not event.is_publish:
continue
topic = RosName.resolve(event.topic, private_ns=pns)
rostype = topic_types[topic]
event.predicate.refine_types(rostype, **refs)
return ast
def remap(self, source, target):
pns = self.private_ns
source = RosName.resolve(source, self.namespace, private_ns = pns)
target = RosName.resolve(target, self.namespace, private_ns = pns)
self.remaps[source] = target
def remap(self, source, target):
pns = self.private_ns
source = RosName.resolve(source, self.namespace, private_ns = pns)
target = RosName.resolve(target, self.namespace, private_ns = pns)
self.remaps[source] = target
def _parse_property(self, text, topic_types, pns=""):
ast = self.property_parser.parse(text)
refs = {}
events = (ast.scope.activator, ast.pattern.trigger,
ast.pattern.behaviour, ast.scope.terminator)
for event in events:
if event is None or not event.is_publish:
continue
topic = RosName.resolve(event.topic, private_ns=pns)
if topic not in topic_types:
raise HplTypeError(
"No type information for topic '{}'".format(topic))
rostype = topic_types[topic]
if event.alias:
assert event.alias not in refs, "duplicate event alias"
assert rostype.is_message, "topic type is not a message"
# update refs without worries about temporal order of events
# prop.sanity_check() already checks that
refs[event.alias] = rostype
for event in events:
if event is None or not event.is_publish:
continue
topic = RosName.resolve(event.topic, private_ns=pns)
rostype = topic_types[topic]
event.predicate.refine_types(rostype, **refs)
def transform(name, ns="/", private_ns="", remaps=None):
name = RosName.resolve(name, ns=ns, private_ns=private_ns)
if remaps:
return remaps.get(name, name)
return name
def remove_param(self, name, ns, condition):
# TODO check whether "~p" = "/rosparam/p" is intended or a bug
ns = self._ns_join(ns or self.private_ns, self.private_ns)
name = RosName.resolve(name, ns, "/rosparam")
param = self.configuration.parameters.get(name)
if not param:
raise ConfigurationError("missing parameter: " + name)
if not condition is True or self.conditions:
return
else:
self.resources.deleted_params.append(name)
def _namespace(self, ns, private = False):
pns = self.private_ns
if not ns:
return self.namespace if not private else pns
if private:
return RosName.resolve(ns, pns, private_ns = pns)
return RosName.resolve(ns, self.namespace)
def _parse_assumption(self, text, topic_types, pns=""):
ast = self.assumption_parser.parse(text)
# assumptions, like events, also have .topic and .predicate
topic = RosName.resolve(ast.topic, private_ns=pns)
if topic not in topic_types:
raise HplTypeError(
"No type information for topic '{}'".format(topic))
ast.predicate.refine_types(topic_types[topic])
return ast