Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _setup_path(self):
setup_file = os.path.join(self.package.path, 'setup.py')
if not os.path.isfile(setup_file):
return []
parser = PyAstParser(workspace=self.package.path)
setup = parser.parse(setup_file)
setup_call = (CodeQuery(setup).all_calls
.where_name('generate_distutils_setup')
.get()
or
CodeQuery(setup).all_calls
.where_name('setup')
.get())[0]
package_dir = self.get_arg(setup_call, 0, 'package_dir')
if hasattr(package_dir, 'value'):
package_dir = {
keyword.name: keyword.value
for keyword in self.get_arg(setup_call, 0, 'package_dir').value
}
else:
src_path = os.path.join(self.package.path, 'src')
package_dir = {'': 'src'} if os.path.exists(src_path) else {}
root = package_dir.get('', '')
return [os.path.join(self.package.path, root)]
def _query_nh_param_primitives(self, node, gs):
nh_prefix = "c:@N@ros@S@NodeHandle@"
reads = ("getParam", "getParamCached", "param", "hasParam",
"searchParam")
for call in CodeQuery(gs).all_calls.where_name(reads).get():
if (call.full_name.startswith("ros::NodeHandle")
or (isinstance(call.reference, str)
and call.reference.startswith(nh_prefix))):
self._on_read_param(node, self._resolve_node_handle(call),
call)
writes = ("setParam", "deleteParam")
for call in CodeQuery(gs).all_calls.where_name(writes).get():
if (call.full_name.startswith("ros::NodeHandle")
or (isinstance(call.reference, str)
and call.reference.startswith(nh_prefix))):
self._on_write_param(node, self._resolve_node_handle(call),
call)
msg_type = analysis._extract_message_type(call)
queue_size = analysis._extract_queue_size(call)
pub = publisher(name, msg_type)
rosmodel.pubs.append(pub)
for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()):
name = analysis._extract_topic(call)
msg_type = analysis._extract_message_type(call)
queue_size = analysis._extract_queue_size(call)
sub = subscriber(name, msg_type)
rosmodel.subs.append(sub)
for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()):
name = analysis._extract_topic(call)
srv_type = analysis._extract_message_type(call)
srv_server = service_server(name, srv_type)
rosmodel.srvsrvs.append(srv_server)
for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()):
name = analysis._extract_topic(call)
srv_type = analysis._extract_message_type(call)
srv_client = service_client(name, srv_type)
rosmodel.srvcls.append(srv_client)
def parse_arg(self):
def _query_param_primitives(self, node, gs):
ros_prefix = "c:@N@ros@N@param@"
reads = ("get", "getCached", "param", "has")
for call in CodeQuery(gs).all_calls.where_name(reads).get():
if (call.full_name.startswith("ros::param")
or (isinstance(call.reference, str)
and call.reference.startswith(ros_prefix))):
self._on_read_param(node, "", call)
for call in (CodeQuery(gs).all_calls.where_name("search")
.where_result("bool").get()):
if (call.full_name.startswith("ros::param")
or (isinstance(call.reference, str)
and call.reference.startswith(ros_prefix))):
if len(call.arguments) > 2:
ns = resolve_expression(call.arguments[0])
if not isinstance(ns, basestring):
ns = "?"
else:
ns = "~"
self._on_read_param(node, ns, call)
def extract_primitives(self, node, gs, analysis, rosmodel):
for call in (CodeQuery(gs).all_calls.where_name("advertise").where_result("ros::Publisher").get()):
name = analysis._extract_topic(call)
msg_type = analysis._extract_message_type(call)
queue_size = analysis._extract_queue_size(call)
pub = publisher(name, msg_type)
rosmodel.pubs.append(pub)
for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()):
name = analysis._extract_topic(call)
msg_type = analysis._extract_message_type(call)
queue_size = analysis._extract_queue_size(call)
sub = subscriber(name, msg_type)
rosmodel.subs.append(sub)
for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()):
name = analysis._extract_topic(call)
srv_type = analysis._extract_message_type(call)
srv_server = service_server(name, srv_type)
rosmodel.srvsrvs.append(srv_server)
for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()):
name = analysis._extract_topic(call)
srv_type = analysis._extract_message_type(call)
srv_client = service_client(name, srv_type)
rosmodel.srvcls.append(srv_client)
def parse_arg(self):
msg_type = "sensor_msgs/Image")
self.log.debug("Looking for message_filters::Subscriber calls.")
for call in CodeQuery(gs).all_calls.where_name("Subscriber").get():
self.log.debug("Found: %s", call.pretty_str())
self.log.debug("%s", type(call))
self.log.debug("%s", call.__dict__)
if isinstance(call.reference, str):
if not call.reference.startswith("c:@N@message_filters@S@Subscriber"):
continue
if not "message_filters::Subscriber" in call.canonical_type:
continue
n = call.arguments[0] if call.arguments else None
self._on_subscription(node, self._resolve_node_handle(n),
call, topic_pos = 1, queue_pos = 2)
self.log.debug("Looking for image_transport::Subscriber calls.")
for call in CodeQuery(gs).all_calls.where_name("subscribe").get():
if call.canonical_type != "image_transport::Subscriber":
continue
self.log.debug("Found: %s", call.pretty_str())
self.log.debug("%s", type(call))
self.log.debug("%s", call.__dict__)
n = call.method_of if call.method_of else None
self._on_subscription(node, self._resolve_it_node_handle(n),
call, msg_type = "sensor_msgs/Image")
self.log.debug("Looking for image_transport::Publisher.")
for call in CodeQuery(gs).all_calls.where_name("advertise").get():
if call.canonical_type != "image_transport::Publisher":
continue
self.log.debug("Found: %s", call.pretty_str())
self.log.debug("%s", type(call))
self.log.debug("%s", call.__dict__)
n = call.method_of if call.method_of else None
if call.canonical_type != "ros::Subscriber":
continue
self._on_subscription(node,
self._resolve_node_handle(call.method_of), call)
for call in CodeQuery(gs).all_calls.where_name("advertiseService").get():
if call.canonical_type != "ros::ServiceServer":
continue
self._on_service(node,
self._resolve_node_handle(call.method_of), call)
for call in CodeQuery(gs).all_calls.where_name("serviceClient").get():
if call.canonical_type != "ros::ServiceClient":
continue
self._on_client(node,
self._resolve_node_handle(call.method_of), call)
self.log.debug("Looking for image_transport::SubscriberFilter calls.")
for call in CodeQuery(gs).all_calls.where_name("SubscriberFilter").get():
self.log.debug("Found: %s", call.pretty_str())
self.log.debug("%s", type(call))
self.log.debug("%s", call.__dict__)
if isinstance(call.reference, str):
if not call.reference.startswith("c:@N@image_transport@S@SubscriberFilter"):
continue
if not "image_transport::SubscriberFilter" in call.canonical_type:
continue
n = call.arguments[0] if call.arguments else None
self._on_subscription(node, self._resolve_it_node_handle(n),
call, topic_pos = 1, queue_pos = 2,
msg_type = "sensor_msgs/Image")
self.log.debug("Looking for message_filters::Subscriber calls.")
for call in CodeQuery(gs).all_calls.where_name("Subscriber").get():
self.log.debug("Found: %s", call.pretty_str())
self.log.debug("%s", type(call))