Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser.add_option("-o", "--output-file",
dest="output_file", default=None,
help="file to store test results in", metavar="FILE")
options, args = parser.parse_args()
if not args:
parser.error('please specify a file or directory for roslaunch files to test')
roslaunch_path = args[0]
# #2590: implementing this quick and dirty as this script should only be used by higher level tools
env_vars = args[1:]
for e in env_vars:
var, val = e.split('=')
os.environ[var] = val
pkg = rospkg.get_package_name(roslaunch_path)
r = rospkg.RosPack()
pkg_dir = r.get_path(pkg)
if os.path.isfile(roslaunch_path):
error_msg = check_roslaunch_file(roslaunch_path)
outname = os.path.basename(roslaunch_path).replace('.', '_')
else:
print("checking *.launch in directory", roslaunch_path)
error_msg = check_roslaunch_dir(roslaunch_path)
abspath = os.path.abspath
relpath = abspath(roslaunch_path)[len(abspath(pkg_dir))+1:]
outname = relpath.replace(os.sep, '_')
if outname == '.':
outname = '_pkg'
import rostest.rostestutil
roslaunch.core.add_printerrlog_handler(logger.error)
logger.info('rostest starting with options %s, args %s'%(options, args))
if len(args) == 0:
parser.error("You must supply a test file argument to rostest.")
if len(args) != 1:
parser.error("rostest only accepts a single test file")
# compute some common names we'll be using to generate test names and files
test_file = args[0]
if options.pkg_dir and options.package:
# rosbuild2: the build system knows what package and directory, so let it tell us,
# instead of shelling back out to rospack
pkg_dir, pkg = options.pkg_dir, options.package
else:
pkg = rospkg.get_package_name(test_file)
r = rospkg.RosPack()
pkg_dir = r.get_path(pkg)
if options.results_filename:
outname = options.results_filename
if '.' in outname:
outname = outname[:outname.rfind('.')]
else:
outname = rostest_name_from_path(pkg_dir, test_file)
env = None
if options.results_base_dir:
env = {ROS_TEST_RESULTS_DIR: options.results_base_dir}
# #1140
if not os.path.isfile(test_file):
def check_main():
if len(sys.argv) < 2:
usage()
if '--rostest' in sys.argv[1:]:
if len(sys.argv) != 4:
usage()
test_pkg, test_file = [a for a in sys.argv[1:] if a != '--rostest']
# this logic derives the output filename that rostest uses
r = rospkg.RosPack()
pkg_name = rospkg.get_package_name(test_file)
pkg_dir = r.get_path(pkg_name)
# compute test name for friendlier reporting
outname = rosunit.rostest_name_from_path(pkg_dir, test_file)
test_file = rosunit.xml_results_file(test_pkg, outname, is_rostest=True)
else:
if len(sys.argv) != 2:
usage()
test_file = sys.argv[1]
print('Checking for test results in %s' % test_file)
if not os.path.exists(test_file):
if not os.path.exists(os.path.dirname(test_file)):
os.makedirs(os.path.dirname(test_file))
test_file = args[0]
if options.test_name:
test_name = options.test_name
else:
test_name = os.path.basename(test_file)
if '.' in test_name:
test_name = test_name[:test_name.rfind('.')]
time_limit = float(options.time_limit) if options.time_limit else None
# If the caller didn't tell us the package name, we'll try to infer it.
# compute some common names we'll be using to generate test names and files
pkg = options.pkg
if not pkg:
pkg = rospkg.get_package_name(test_file)
if not pkg:
print("Error: failed to determine package name for file '%s'; maybe you should supply the --package argument to rosunit?" % (test_file))
sys.exit(1)
try:
runner_result = None
results = Result('rosunit', 0, 0, 0)
test_case = BareTestCase(test_file, args[1:],
retry=0, time_limit=time_limit,
test_name=test_name, text_mode=options.text_mode, package_name=pkg)
suite = unittest.TestSuite()
suite.addTest(test_case)
if options.text_mode:
result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)
static_plugins, online_plugins = [], []
# - do a ros_root check first and abort if it fails as rest of tests are useless after that
error = ros_root_check(None, ros_root=os.environ['ROS_ROOT'])
if error:
print("ROS_ROOT is invalid: "+str(error))
sys.exit(1)
all_warnings = []
all_errors = []
if launch_files:
ctx = WtfContext.from_roslaunch(launch_files)
#TODO: allow specifying multiple roslaunch files
else:
curr_package = rospkg.get_package_name('.')
if curr_package:
print("Package:", curr_package)
ctx = WtfContext.from_package(curr_package)
#TODO: load all .launch files in package
elif os.path.isfile('stack.xml'):
curr_stack = os.path.basename(os.path.abspath('.'))
print("Stack:", curr_stack)
ctx = WtfContext.from_stack(curr_stack)
else:
print("No package or stack in the current directory")
ctx = WtfContext.from_env()
if options.all_packages:
print("roswtf will run against all packages")
ctx.pkgs = all_pkgs
# static checks
def get_file_dependencies(f, stdout=sys.stdout, stderr=sys.stderr, rospack=None):
"""
Compute dependencies of the specified message/service file
@param f: message or service file to get dependencies for
@type f: str
@param stdout pipe: stdout pipe
@type stdout: file
@param stderr pipe: stderr pipe
@type stderr: file
@return: 'files': list of files that \a file depends on,
'deps': list of dependencies by type, 'spec': Msgs/Srvs
instance.
@rtype: dict
"""
package = rospkg.get_package_name(f)
spec = None
if f.endswith(roslib.msgs.EXT):
_, spec = roslib.msgs.load_from_file(f)
elif f.endswith(roslib.srvs.EXT):
_, spec = roslib.srvs.load_from_file(f)
else:
raise Exception('[%s] does not appear to be a message or service' % spec)
return get_dependencies(spec, package, stdout, stderr, rospack=rospack)
file_deps is a { filename : RoslaunchDeps } dictionary of
roslaunch dependency information to update, indexed by roslaunch
file name.
missing is a { package : [packages] } dictionary of missing
manifest dependencies, indexed by package.
@rtype: str, dict, dict
"""
file_deps = {}
missing = {}
base_pkg = None
for launch_file in files:
if not os.path.exists(launch_file):
raise RoslaunchDepsException("roslaunch file [%s] does not exist"%launch_file)
pkg = rospkg.get_package_name(os.path.dirname(os.path.abspath(launch_file)))
if base_pkg and pkg != base_pkg:
raise RoslaunchDepsException("roslaunch files must be in the same package: %s vs. %s"%(base_pkg, pkg))
base_pkg = pkg
rl_file_deps(file_deps, launch_file, verbose)
calculate_missing(base_pkg, missing, file_deps, use_test_depends=use_test_depends)
return base_pkg, file_deps, missing
def print_deps(base_pkg, file_deps, verbose):
pkgs = []
# for verbose output we print extra source information
if verbose:
for f, deps in file_deps.items():
for p, t in deps.nodes:
print("%s [%s/%s]"%(p, p, t))
pkg = rospkg.get_package_name(os.path.dirname(os.path.abspath(f)))
if pkg is None: #cannot determine package
print("ERROR: cannot determine package for [%s]"%pkg, file=sys.stderr)
else:
print("%s [%s]"%(pkg, f))
print('-'*80)
# print out list of package dependencies
pkgs = []
for deps in file_deps.values():
pkgs.extend(deps.pkgs)
# print space-separated to be friendly to rosmake
print(' '.join([p for p in set(pkgs)]))
mainly used as a subroutine of roslaunch_deps().
@param base_pkg: name of package where initial walk begins (unused).
@type base_pkg: str
@param missing: dictionary mapping package names to set of missing package dependencies.
@type missing: { str: set(str) }
@param file_deps: dictionary mapping launch file names to RoslaunchDeps of each file
@type file_deps: { str: RoslaunchDeps}
@param use_test_depends [bool]: use test_depends as installed package
@type use_test_depends: [bool]
@return: missing (see parameter)
@rtype: { str: set(str) }
"""
rospack = rospkg.RosPack()
for launch_file in file_deps.keys():
pkg = rospkg.get_package_name(os.path.dirname(os.path.abspath(launch_file)))
if pkg is None: #cannot determine package
print("ERROR: cannot determine package for [%s]"%pkg, file=sys.stderr)
continue
m = rospack.get_manifest(pkg)
d_pkgs = set([d.name for d in m.depends])
if m.is_catkin:
# for catkin packages consider the run dependencies instead
# else not released packages will not appear in the dependency list
# since rospkg does uses rosdep to decide which dependencies to return
from catkin_pkg.package import parse_package
p = parse_package(os.path.dirname(m.filename))
d_pkgs = set([d.name for d in p.run_depends])
if use_test_depends:
for d in p.test_depends:
d_pkgs.add(d.name)