How to use the bonsai.analysis.get_conditions function in bonsai

To help you get started, we’ve selected a few bonsai examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github git-afsantos / haros / haros / extractor.py View on Github external
def _on_read_param(self, node, ns, call):
        if len(call.arguments) < 1:
            return
        name = self._extract_topic(call)
        depth = get_control_depth(call, recursive = True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location = location)
                      for c in get_conditions(call, recursive = True)]
        read = ReadParameterCall(name, ns, None, location = location,
                                 control_depth = depth, conditions = conditions,
                                 repeats = is_under_loop(call, recursive = True))
        node.read_param.append(read)
        self.log.debug("Found Read on %s/%s (%s)", ns, name, "string")
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_write_param(self, node, ns, call):
        if len(call.arguments) < 1:
            return
        name = self._extract_topic(call)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        wrt = WriteParameterCall(name, ns, None, location=location,
                                 control_depth=depth, conditions=conditions,
                                 repeats=is_under_loop(call, recursive=True))
        node.write_param.append(wrt)
        self.log.debug("Found Write on %s/%s (%s)", ns, name, "string")
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_subscription(self, node, ns, call, topic_pos=0, queue_pos=1,
                         msg_type=None):
        if len(call.arguments) <= 1:
            return
        name = self._extract_topic(call, topic_pos=topic_pos)
        msg_type = msg_type or self._extract_message_type(call)
        queue_size = self._extract_queue_size(call, queue_pos=queue_pos)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        sub = Subscription(name, ns, msg_type, queue_size, location=location,
                           control_depth=depth, conditions=conditions,
                           repeats=is_under_loop(call, recursive=True))
        node.subscribe.append(sub)
        self.log.debug("Found Subscription on %s/%s (%s)", ns, name, msg_type)
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_client(self, node, call):
        if self.invalid_call(call):
            return

        ns, name = self._extract_topic(call)
        msg_type = self._extract_message_type(call, 'service_class', self.msgs_list)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        cli = ServiceClientCall(name, ns, msg_type, location=location,
                                control_depth=depth, conditions=conditions,
                                repeats=is_under_loop(call, recursive=True))
        node.client.append(cli)
        self.log.debug("Found Client on %s/%s (%s)", ns, name, msg_type)
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_service(self, node, ns, call):
        if len(call.arguments) <= 1:
            return
        name = self._extract_topic(call)
        msg_type = self._extract_message_type(call)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        srv = ServiceServerCall(name, ns, msg_type, location=location,
                                control_depth=depth, conditions=conditions,
                                repeats=is_under_loop(call, recursive=True))
        node.service.append(srv)
        self.log.debug("Found Service on %s/%s (%s)", ns, name, msg_type)
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_subscription(self, node, call):
        if self.invalid_call(call):
            return
        ns, name = self._extract_topic(call)
        msg_type = self._extract_message_type(call, 'data_class', self.msgs_list)
        queue_size = self._extract_queue_size(call)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        sub = Subscription(name, ns, msg_type, queue_size, location=location,
                           control_depth=depth, conditions=conditions,
                           repeats=is_under_loop(call, recursive=True))
        node.subscribe.append(sub)
        self.log.debug("Found Subscription on %s/%s (%s)", ns, name, msg_type)
github git-afsantos / haros / haros / extractor.py View on Github external
def _on_publication(self, node, ns, call, topic_pos=0, queue_pos=1,
                        msg_type=None):
        if len(call.arguments) <= 1:
            return
        name = self._extract_topic(call, topic_pos=topic_pos)
        msg_type = msg_type or self._extract_message_type(call)
        queue_size = self._extract_queue_size(call, queue_pos=queue_pos)
        depth = get_control_depth(call, recursive=True)
        location = self._call_location(call)
        conditions = [SourceCondition(pretty_str(c), location=location)
                      for c in get_conditions(call, recursive=True)]
        pub = Publication(name, ns, msg_type, queue_size, location=location,
                          control_depth=depth, conditions=conditions,
                          repeats=is_under_loop(call, recursive=True))
        node.advertise.append(pub)
        self.log.debug("Found Publication on %s/%s (%s)", ns, name, msg_type)