Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _extract_action(self, call):
name = "?"
if "SimpleActionServer" in call.canonical_type and len(call.arguments) > 2:
arg = call.arguments[1]
if not isinstance(arg, basestring):
arg = resolve_expression(arg)
if isinstance(arg, basestring):
name = arg.split()[-1].replace("'", "")
elif "SimpleActionClient" in call.canonical_type and len(call.arguments) > 1:
if isinstance(call.arguments[0], basestring):
name = call.arguments[0]
return name
def _query_param_primitives(self, node, gs):
ros_prefix = "c:@N@ros@N@param@"
reads = ("get", "getCached", "param", "has")
for call in CodeQuery(gs).all_calls.where_name(reads).get():
if (call.full_name.startswith("ros::param")
or (isinstance(call.reference, str)
and call.reference.startswith(ros_prefix))):
self._on_read_param(node, "", call)
for call in (CodeQuery(gs).all_calls.where_name("search")
.where_result("bool").get()):
if (call.full_name.startswith("ros::param")
or (isinstance(call.reference, str)
and call.reference.startswith(ros_prefix))):
if len(call.arguments) > 2:
ns = resolve_expression(call.arguments[0])
if not isinstance(ns, basestring):
ns = "?"
else:
ns = "~"
self._on_read_param(node, ns, call)
writes = ("set", "del")
for call in CodeQuery(gs).all_calls.where_name(writes).get():
if (call.full_name.startswith("ros::param")
or (isinstance(call.reference, str)
and call.reference.startswith(ros_prefix))):
self._on_write_param(node, "", call)
def _extract_topic(self, call, topic_pos=0):
name = resolve_expression(call.arguments[topic_pos])
if not isinstance(name, basestring):
name = "?"
return name or "?"
def _extract_topic(cls, call):
name = resolve_expression(cls.get_arg(call, 0, 'name'))
if not isinstance(name, basestring):
name = '?'
return cls.split_ns_name(name)
def _extract_queue_size(cls, call):
pos = cls.queue_size_pos[call.name.lower()]
queue_size_arg = cls.get_arg(call, pos, 'queue_size')
try:
queue_size = resolve_expression(queue_size_arg)
assert(isinstance(queue_size, (int, long, float)))
return queue_size
except AssertionError:
return None
def _resolve_it_node_handle(self, value):
value = resolve_expression(value)
if (isinstance(value, CppFunctionCall)
and value.name == "ImageTransport"):
return self._resolve_node_handle(value.arguments[0])
return "?"