Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def launch(self):
self.parse_arg()
if self.args.node:
#FIND WORKSPACE:
#Fixme: the env variable ROS_PACKAGE_PATH is not the best way to extract the workspace path
ros_pkg_path = os.environ.get("ROS_PACKAGE_PATH")
ws = ros_pkg_path[:ros_pkg_path.find("/src:")]
#FIND PACKAGE PATH
pkg = Package(self.args.package_name)
rospack = rospkg.RosPack()
pkg.path = rospack.get_path(self.args.package_name)
#BONSAI PARSER
parser = CppAstParser(workspace = ws)
parser.set_library_path("/usr/lib/llvm-3.8/lib")
parser.set_standard_includes("/usr/lib/llvm-3.8/lib/clang/3.8.0/include")
db_dir = os.path.join(ws, "build")
if os.path.isfile(os.path.join(db_dir, "compile_commands.json")):
parser.set_database(db_dir)
#HAROS NODE EXTRACTOR
analysis = NodeExtractor(pkg, env=dict(os.environ), ws=ws ,node_cache=False, parse_nodes=True)
node = Node(self.args.name, pkg, rosname=RosName(self.args.name))
srcdir = pkg.path[len(ws):]
srcdir = os.path.join(ws, srcdir.split(os.sep, 1)[0])
bindir = os.path.join(ws, "build")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "/a"
assert "/a" == n1
assert n2 == "ns/a"
assert "ns/a" == n2
n1 = RosName("a", "ns")
assert n1 == n2
n1 = RosName("a")
n2 = RosName("a")
assert n1 == n2
n1 = RosName("~a", "ns", "priv")
n2 = RosName("a", "ns")
assert n1 != n2
assert n1 == "priv/a"
n = Node("base", Package("pkg"), rosname = RosName("base"))
assert n.rosname == "/base"
if __name__ == "__main__":
data = DataManager()
# ----- test data -------------------------------------------------------------
pkg = Package("A")
pkg.dependencies.add("X")
data.packages[pkg.id] = pkg
pkg = Package("B")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("C")
pkg.dependencies.add("X")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("D")
pkg.dependencies.add("A")
pkg.dependencies.add("C")
data.packages[pkg.id] = pkg
pkg = Package("E")
data.packages[pkg.id] = pkg
pkg = Package("F")
pkg.dependencies.add("E")
data.packages[pkg.id] = pkg
# ----- topological sort test -------------------------------------------------
data._topological_sort()
print data._topological_packages
return cls._cache[node_id]
filename = os.path.join(cls.model_dir, pkg + ".yaml")
try:
with open(filename) as handle:
data = yaml.safe_load(handle)
except IOError as e:
cls.log.debug("YAML file not found: %s", filename)
return None
if not cls.distro in data:
cls.log.debug("Package has no data for ROS %s.", cls.distro)
return None
if not node_type in data[cls.distro]:
cls.log.debug("Node does not exist for ROS %s.", cls.distro)
return None
cls.log.debug("Building node from YAML data.")
node = cls._build_node(node_type, cls.distro, Package(pkg), data)
cls._cache[node_id] = node
return node
def parse(pkg_file, project = None):
PackageParser.log.debug("PkgParser.parse(%s, %s)", pkg_file, project)
with open(pkg_file, "r") as handle:
root = ET.parse(handle).getroot()
name = root.find("name").text.strip()
package = Package(name, proj = project)
package.path = os.path.dirname(pkg_file)
PackageParser.log.info("Found package %s at %s", package, package.path)
PackageParser._parse_metadata(root, package)
PackageParser._parse_export(root, package)
PackageParser._parse_dependencies(root, package)
return package
with open(index_file, "r") as handle:
data = yaml.load(handle)
else:
data = { "packages": [] }
self.project = Project(data.get("project", "default"))
# Step 1: find packages locally
_log.info("Looking for packages locally.")
missing = []
pkg_list = self._read_launch_listing(data.get("launch"))
pkg_list.extend(data.get("packages", []))
if not pkg_list:
_log.info("Harvesting packages from catkin workspace")
pkg_list = RosPack.get_instance(".").list()
pkg_list = set(pkg_list)
for id in pkg_list:
pkg = Package.locate_offline(id, repo_path)
if pkg is None:
missing.append(id)
else:
SourceFile.populate_package(pkg, self.files)
self.packages[id] = pkg
self.project.packages.append(pkg)
pkg.project = self.project
# Step 2: load repositories only if explicitly told to
_log.debug("Missing packages: %s", missing)
if index_repos:
_log.info("Indexing repositories.")
self.repositories = Repository.load_repositories(
data.get("repositories", {}), pkg_list)
repos = set()
for _, repo in self.repositories.iteritems():
for id in repo.declared_packages:
def load_state(file_path):
_log.debug("DataManager.load_state(%s)", file_path)
with open(file_path, "r") as handle:
return cPickle.load(handle)
if __name__ == "__main__":
data = DataManager()
# ----- test data -------------------------------------------------------------
pkg = Package("A")
pkg.dependencies.add("X")
data.packages[pkg.id] = pkg
pkg = Package("B")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("C")
pkg.dependencies.add("X")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("D")
pkg.dependencies.add("A")
pkg.dependencies.add("C")
data.packages[pkg.id] = pkg
pkg = Package("E")
data.packages[pkg.id] = pkg
pkg = Package("F")
pkg.dependencies.add("E")
data.packages[pkg.id] = pkg
# ----- topological sort test -------------------------------------------------
data._topological_sort()
print data._topological_packages
pkg.dependencies.add("X")
data.packages[pkg.id] = pkg
pkg = Package("B")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("C")
pkg.dependencies.add("X")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("D")
pkg.dependencies.add("A")
pkg.dependencies.add("C")
data.packages[pkg.id] = pkg
pkg = Package("E")
data.packages[pkg.id] = pkg
pkg = Package("F")
pkg.dependencies.add("E")
data.packages[pkg.id] = pkg
# ----- topological sort test -------------------------------------------------
data._topological_sort()
print data._topological_packages
def save_state(self, file_path):
_log.debug("DataManager.save_state(%s)", file_path)
with open(file_path, "w") as handle:
cPickle.dump(self, handle, cPickle.HIGHEST_PROTOCOL)
@staticmethod
def load_state(file_path):
_log.debug("DataManager.load_state(%s)", file_path)
with open(file_path, "r") as handle:
return cPickle.load(handle)
if __name__ == "__main__":
data = DataManager()
# ----- test data -------------------------------------------------------------
pkg = Package("A")
pkg.dependencies.add("X")
data.packages[pkg.id] = pkg
pkg = Package("B")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("C")
pkg.dependencies.add("X")
pkg.dependencies.add("A")
data.packages[pkg.id] = pkg
pkg = Package("D")
pkg.dependencies.add("A")
pkg.dependencies.add("C")
data.packages[pkg.id] = pkg
pkg = Package("E")
data.packages[pkg.id] = pkg
pkg = Package("F")