Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
},
"kobuki_msgs/BumperEvent": {
"bumper": TypeToken("uint8"),
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"float_list": ArrayTypeToken("float64"),
"float_array": ArrayTypeToken("float64", length=3),
"string_list": ArrayTypeToken("string"),
"string_array": ArrayTypeToken("string", length=3),
"twist_list": ArrayTypeToken("geometry_msgs/Twist"),
"twist_array": ArrayTypeToken("geometry_msgs/Twist", length=3),
"nested_array": ArrayTypeToken("pkg/Nested", length=3)
},
"pkg/Nested": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"nested_array": ArrayTypeToken("pkg/Nested2", length=3)
},
"pkg/Nested2": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3)
}
}
fields = (
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"float_list": ArrayTypeToken("float64"),
"float_array": ArrayTypeToken("float64", length=3),
"string_list": ArrayTypeToken("string"),
"string_array": ArrayTypeToken("string", length=3),
"twist_list": ArrayTypeToken("geometry_msgs/Twist"),
"twist_array": ArrayTypeToken("geometry_msgs/Twist", length=3),
"nested_array": ArrayTypeToken("pkg/Nested", length=3)
},
"pkg/Nested": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"nested_array": ArrayTypeToken("pkg/Nested2", length=3)
},
"pkg/Nested2": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3)
}
}
fields = (
HplFieldReference("nested_array[all]",
TEST_DATA["pkg/Nested"]["nested_array"], "pkg/Nested"),
HplFieldReference("int", TEST_DATA["pkg/Nested2"]["int"], "pkg/Nested2")
"x": TypeToken("float64"),
"y": TypeToken("float64"),
"z": TypeToken("float64")
},
"kobuki_msgs/BumperEvent": {
"bumper": TypeToken("uint8"),
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"float_list": ArrayTypeToken("float64"),
"float_array": ArrayTypeToken("float64", length=3),
"string_list": ArrayTypeToken("string"),
"string_array": ArrayTypeToken("string", length=3),
"twist_list": ArrayTypeToken("geometry_msgs/Twist"),
"twist_array": ArrayTypeToken("geometry_msgs/Twist", length=3),
"nested_array": ArrayTypeToken("pkg/Nested", length=3)
},
"pkg/Nested": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"nested_array": ArrayTypeToken("pkg/Nested2", length=3)
},
"pkg/Nested2": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3)
}
def _get_class_fields(self, msg_class, msg_queue):
self.log.debug("extracting msg fields")
fields = {}
for i in xrange(len(msg_class.__slots__)):
field_name = msg_class.__slots__[i]
field_type = msg_class._slot_types[i]
base_type, is_array, length = parse_type(field_type)
if is_array:
type_token = ArrayTypeToken(base_type, length=length)
else:
type_token = TypeToken(base_type)
fields[field_name] = type_token
if "/" in base_type and base_type != "std_msgs/Header":
msg_queue.append(base_type)
return fields
################################################################################
# Test Code
################################################################################
if __name__ == "__main__":
from .hpl_ast import (HplReceiveStatement, HplMsgFilter, HplProperty,
HplMsgFieldCondition, HplFieldReference, HplPublishStatement,
HplTimeBound, HplLiteral)
from .ros_types import (TypeToken, ArrayTypeToken)
TEST_DATA = {
"geometry_msgs/Twist": {
"linear": TypeToken("geometry_msgs/Vector3"),
"angular": TypeToken("geometry_msgs/Vector3")
},
"geometry_msgs/Vector3": {
"x": TypeToken("float64"),
"y": TypeToken("float64"),
"z": TypeToken("float64")
},
"kobuki_msgs/BumperEvent": {
"bumper": TypeToken("uint8"),
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"angular": TypeToken("geometry_msgs/Vector3")
},
"geometry_msgs/Vector3": {
"x": TypeToken("float64"),
"y": TypeToken("float64"),
"z": TypeToken("float64")
},
"kobuki_msgs/BumperEvent": {
"bumper": TypeToken("uint8"),
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"float_list": ArrayTypeToken("float64"),
"float_array": ArrayTypeToken("float64", length=3),
"string_list": ArrayTypeToken("string"),
"string_array": ArrayTypeToken("string", length=3),
"twist_list": ArrayTypeToken("geometry_msgs/Twist"),
"twist_array": ArrayTypeToken("geometry_msgs/Twist", length=3),
"nested_array": ArrayTypeToken("pkg/Nested", length=3)
},
"pkg/Nested": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"nested_array": ArrayTypeToken("pkg/Nested2", length=3)
},
"pkg/Nested2": {
"linear": TypeToken("geometry_msgs/Vector3"),
"angular": TypeToken("geometry_msgs/Vector3")
},
"geometry_msgs/Vector3": {
"x": TypeToken("float64"),
"y": TypeToken("float64"),
"z": TypeToken("float64")
},
"kobuki_msgs/BumperEvent": {
"bumper": TypeToken("uint8"),
"state": TypeToken("uint8")
},
"pkg/Msg": {
"int": TypeToken("int32"),
"float": TypeToken("float64"),
"string": TypeToken("string"),
"twist": TypeToken("geometry_msgs/Twist"),
"int_list": ArrayTypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"float_list": ArrayTypeToken("float64"),
"float_array": ArrayTypeToken("float64", length=3),
"string_list": ArrayTypeToken("string"),
"string_array": ArrayTypeToken("string", length=3),
"twist_list": ArrayTypeToken("geometry_msgs/Twist"),
"twist_array": ArrayTypeToken("geometry_msgs/Twist", length=3),
"nested_array": ArrayTypeToken("pkg/Nested", length=3)
},
"pkg/Nested": {
"int": TypeToken("int32"),
"int_array": ArrayTypeToken("int32", length=3),
"nested_array": ArrayTypeToken("pkg/Nested2", length=3)
},
)
left = HplFieldExpression("nested_array[all].int", fields, "m", "pkg/Nested")
fields = (
HplFieldReference("int", TEST_DATA["pkg/Nested"]["int"], "pkg/Nested"),
)
right = HplFieldExpression("int", fields, "m", "pkg/Nested")
cond1 = HplMsgFieldCondition(left, OPERATOR_EQ, right)
msg_filter = HplMsgFilter((cond1,))
hpl_receive = HplReceiveStatement("m", "/topic",
msg_filter=msg_filter, msg_type="pkg/Nested")
hpl_publish = HplPublishStatement("out", "/topic2",
time_bound=HplTimeBound("<", HplLiteral("100", int, ()), "ms"),
msg_filter=None, mult=None, msg_type="pkg/Nested")
hpl_property = HplProperty(hpl_publish, receive_stmt=hpl_receive)
publishers = {"/topic2": "pkg/Nested"}
subscribers = {"/topic": "pkg/Nested"}
test_gen = HplTestGenerator(
["pkg/launch/minimal.launch"],
["/node1", "/node2"],
publishers, subscribers, TEST_DATA)
print test_gen.gen(hpl_property)
def _python(self, value):
if isinstance(value, self.Reference):
# this must come before check for tuple
name = value.expr.last_name
return value.root + "." + name if name else value.root
if isinstance(value, HplLiteral):
return repr(value.value)
if isinstance(value, tuple):
return "({})".format(", ".join(self._python(v) for v in value))
assert isinstance(value, HplFieldExpression), "found: " + repr(value)
return self._var_prefix + value.full_name
def _add_node(self, config, futures, name, datum):
if config.nodes.get(name) is not None:
self.log.warning(self._W_EXISTS, config.name, "node", name)
return False
pkg, exe = datum["node_type"].split("/", 1)
args = datum.get("args", "")
remaps = datum.get("remaps", {})
node = self.find_node(pkg, exe, args)
rosname = RosName(name)
self.log.debug("[hints] Creating NodeInstance %s for Node %s.",
rosname.full, node.name)
instance = NodeInstance(self.configuration, rosname, node,
argv=args, remaps=remaps)
instance.location2 = JSON_to_loc2(datum.get("traceability"))
node.instances.append(instance)
previous = self.configuration.nodes.add(instance)
fnl = FutureNodeLinks(instance)
fnl.advertise.extend(datum.get("publishers", ()))
fnl.subscribe.extend(datum.get("subscribers", ()))
fnl.service.extend(datum.get("servers", ()))
fnl.client.extend(datum.get("clients", ()))
fnl.read_param.extend(datum.get("getters", ()))
fnl.write_param.extend(datum.get("setters", ()))
futures.append(fnl)
return True