Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _on_publication(self, node, ns, call, topic_pos=0, queue_pos=1,
msg_type=None):
if len(call.arguments) <= 1:
return
name = self._extract_topic(call, topic_pos=topic_pos)
msg_type = msg_type or self._extract_message_type(call)
queue_size = self._extract_queue_size(call, queue_pos=queue_pos)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
pub = Publication(name, ns, msg_type, queue_size, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.advertise.append(pub)
self.log.debug("Found Publication on %s/%s (%s)", ns, name, msg_type)
def _on_client(self, node, ns, call):
if len(call.arguments) <= 1:
return
name = self._extract_topic(call)
msg_type = self._extract_message_type(call)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
cli = ServiceClientCall(name, ns, msg_type, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.client.append(cli)
self.log.debug("Found Client on %s/%s (%s)", ns, name, msg_type)
def _on_service(self, node, ns, call):
if len(call.arguments) <= 1:
return
name = self._extract_topic(call)
msg_type = self._extract_message_type(call)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
srv = ServiceServerCall(name, ns, msg_type, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.service.append(srv)
self.log.debug("Found Service on %s/%s (%s)", ns, name, msg_type)
def _on_read_param(self, node, ns, call):
if len(call.arguments) < 1:
return
name = self._extract_topic(call)
depth = get_control_depth(call, recursive = True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location = location)
for c in get_conditions(call, recursive = True)]
read = ReadParameterCall(name, ns, None, location = location,
control_depth = depth, conditions = conditions,
repeats = is_under_loop(call, recursive = True))
node.read_param.append(read)
self.log.debug("Found Read on %s/%s (%s)", ns, name, "string")
def _on_write_param(self, node, ns, call):
if len(call.arguments) < 1:
return
name = self._extract_topic(call)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
wrt = WriteParameterCall(name, ns, None, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.write_param.append(wrt)
self.log.debug("Found Write on %s/%s (%s)", ns, name, "string")
def _on_subscription(self, node, call):
if self.invalid_call(call):
return
ns, name = self._extract_topic(call)
msg_type = self._extract_message_type(call, 'data_class', self.msgs_list)
queue_size = self._extract_queue_size(call)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
sub = Subscription(name, ns, msg_type, queue_size, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.subscribe.append(sub)
self.log.debug("Found Subscription on %s/%s (%s)", ns, name, msg_type)
def _on_client(self, node, call):
if self.invalid_call(call):
return
ns, name = self._extract_topic(call)
msg_type = self._extract_message_type(call, 'service_class', self.msgs_list)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
cli = ServiceClientCall(name, ns, msg_type, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.client.append(cli)
self.log.debug("Found Client on %s/%s (%s)", ns, name, msg_type)