Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# pass in config to class as a property so that test_parent can be initialized
classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config,
'test_parent': None, 'test_file': test_file,
'reuse_master': reuse_master, 'clear': clear }
# add in the tests
testNames = []
for test in config.tests:
# #1989: find test first to make sure it exists and is executable
err_msg = None
try:
rp = rospkg.RosPack()
cmd = roslib.packages.find_node(test.package, test.type, rp)
if not cmd:
err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type)
except rospkg.ResourceNotFound as e:
err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type)
testName = 'test%s'%(test.test_name)
if err_msg:
classdict[testName] = failRunner(test.test_name, err_msg)
elif testName in testNames:
classdict[testName] = failDuplicateRunner(test.test_name)
else:
classdict[testName] = rostestRunner(test, pkg, results_base_dir=results_base_dir)
testNames.append(testName)
# instantiate the TestCase instance with our magically-created tests
return type('RosTest',(unittest.TestCase,),classdict)
path,
os.pathsep,
os.environ['ROS_PACKAGE_PATH']
)
pkgs = find_catkin_packages_in(path, options.verbose)
packages.extend(pkgs)
# Make packages list unique
packages = list(set(packages))
else:
rospack = rospkg.RosPack()
rosstack = rospkg.RosStack()
val = rospkg.expand_to_packages(args, rospack, rosstack)
packages = val[0]
not_found = val[1]
if not_found:
raise rospkg.ResourceNotFound(not_found[0], rospack.get_ros_paths())
# Handle the --ignore-src option
if command in ['install', 'check', 'keys'] and options.ignore_src:
if options.verbose:
print('Searching ROS_PACKAGE_PATH for '
'sources: ' + str(os.environ['ROS_PACKAGE_PATH'].split(':')))
ws_pkgs = get_workspace_packages()
for path in os.environ['ROS_PACKAGE_PATH'].split(':'):
path = os.path.abspath(path.strip())
if os.path.exists(path):
pkgs = find_catkin_packages_in(path, options.verbose)
ws_pkgs.extend(pkgs)
elif options.verbose:
print('Skipping non-existent path ' + path)
set_workspace_packages(ws_pkgs)
elif command == 'packages':
rosmsg_cmd_packages(ext, full)
elif command == 'list':
rosmsg_cmd_list(ext, full)
elif command == 'md5':
rosmsg_cmd_md5(ext, full)
elif command == '--help':
print(fullusage(mode))
sys.exit(0)
else:
print(fullusage(mode))
sys.exit(getattr(os, 'EX_USAGE', 1))
except KeyError as e:
print("Unknown message type: %s"%e, file=sys.stderr)
sys.exit(getattr(os, 'EX_USAGE', 1))
except rospkg.ResourceNotFound as e:
print("Invalid package: %s"%e, file=sys.stderr)
sys.exit(getattr(os, 'EX_USAGE', 1))
except ValueError as e:
print("Invalid type: '%s'"%e, file=sys.stderr)
sys.exit(getattr(os, 'EX_USAGE', 1))
except ROSMsgException as e:
print(str(e), file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
pass
pkg_path = rospkg.RosPack().get_path(self._ros_pkg)
return uri.replace(result[0], pkg_path + '/')
elif '$(find' in uri:
uri_temp = uri.replace('$', '')
result = re.findall('(find \w+)', uri_temp)
if len(result) == 0:
msg = 'Invalid package path for provided mesh uri {}'.format(uri)
PCG_ROOT_LOGGER.error(msg)
raise ValueError(msg)
self._ros_pkg = result[0].split()[1]
if self._ros_pkg not in rospkg.RosPack().list():
msg = 'Package {} was not found, uri={}'.format(self._ros_pkg, uri)
PCG_ROOT_LOGGER.error(msg)
raise rospkg.ResourceNotFound(msg)
pkg_path = rospkg.RosPack().get_path(self._ros_pkg)
return uri_temp.replace('(find {})'.format(self._ros_pkg), pkg_path)
else:
return None
def get_view_key(self, resource_name):
"""
Map *resource_name* to a view key. In rospkg, this maps the
DEFAULT_VIEW_KEY if *resource_name* exists.
:raises: :exc:`rospkg.ResourceNotFound`
"""
if (
resource_name in self.get_catkin_paths() or
resource_name in self.get_loadable_resources()
):
return DEFAULT_VIEW_KEY
else:
raise rospkg.ResourceNotFound(resource_name)
def _load_stack(ctx, stack):
"""
Utility for initializing WtfContext state
@raise WtfException: if context state cannot be initialized
"""
r = ctx.rosstack
ctx.stack = stack
try:
ctx.stacks = [stack] + r.get_depends(stack, implicit=True)
except rospkg.ResourceNotFound as e:
raise WtfException("Cannot load dependencies of stack [%s]: %s"%(stack, e))
try:
ctx.stack_dir = r.get_path(stack)
except rospkg.ResourceNotFound:
raise WtfException("[%s] appears to be a stack, but it's not on your ROS_PACKAGE_PATH"%stack)
:raises: :exc:`rospkg.ResourceNotFound` if *view_key* cannot be located
:raises: :exc:`RosdepInternalError`
"""
if view_key in self._view_cache:
return self._view_cache[view_key]
# lazy-init
self._load_view_dependencies(view_key, self.loader)
# use dependencies to create view
try:
dependencies = self.rosdep_db.get_view_dependencies(view_key)
except KeyError as e:
# convert to ResourceNotFound. This should be decoupled
# in the future
raise ResourceNotFound(str(e.args[0]))
# load views in order
view = self.create_rosdep_view(view_key, dependencies + [view_key], verbose=verbose)
self._view_cache[view_key] = view
return view
# resolve path elements
if isstring(value) and (value.startswith('$')):
value = interpret_path(value)
rospy.logdebug("interpret parameter '%s' to '%s'" % (value, pval))
# add parameter to the multicall
param_server_multi.setParam(rospy.get_name(), pkey, value)
test_ret = _test_value(pkey, value)
if test_ret:
param_errors.extend(test_ret)
r = param_server_multi()
for code, msg, _ in r:
if code != 1:
raise exceptions.StartException("Failed to set parameter: %s" % (msg))
except roslaunch.core.RLException as e:
raise exceptions.StartException(e)
except rospkg.ResourceNotFound as rnf:
raise exceptions.StartException("Failed to set parameter. ResourceNotFound: %s" % (rnf))
except Exception as e:
raise exceptions.StartException("Failed to set parameter. ROS Parameter Server "
"reports: %s\n\n%s" % (e, '\n'.join(param_errors)))
finally:
socket.setdefaulttimeout(None)
return abs_paths, not_found_packages
def _get_package_source_path(pkg_name):
rp = rospkg.RosPack()
try:
pkg_path = rp.get_path(pkg_name)
except rospkg.ResourceNotFound:
print('\033[31m{name} is not found in {path}\033[0m'
.format(name=pkg_name, path=rp.list()))
return
pkg_path = rp.get_path(pkg_name)
return pkg_path