Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
if 'ROSDEP_DEBUG' in os.environ:
del os.environ['ROSDEP_DEBUG']
self.old_rr = rospkg.get_ros_root()
self.old_rpp = rospkg.get_ros_package_path()
self.old_app = os.getenv(AMENT_PREFIX_PATH_ENV_VAR, None)
if 'ROS_ROOT' in os.environ:
del os.environ['ROS_ROOT']
os.environ['ROS_PACKAGE_PATH'] = os.path.join(get_test_tree_dir())
os.environ[AMENT_PREFIX_PATH_ENV_VAR] = os.path.join(get_test_tree_dir(), 'ament')
if 'ROS_PYTHON_VERSION' not in os.environ:
# avoid `test_check` failure due to warning on stderr
os.environ['ROS_PYTHON_VERSION'] = sys.version[0]
def get_ros_root(required=False, env=None):
"""
Get the value of ROS_ROOT.
@param env: override environment dictionary
@type env: dict
@param required: if True, fails with ROSException
@return: Value of ROS_ROOT environment
@rtype: str
@raise ROSException: if require is True and ROS_ROOT is not set
"""
if env is None:
env = os.environ
ros_root = rospkg.get_ros_root(env)
if required and not ros_root:
raise rospy.exceptions.ROSException('%s is not set'%rospkg.environment.ROS_ROOT)
return ros_root
def rle_wrapper(fn):
"""
Wrap lower-level exceptions in RLException class
:returns:: function wrapper that throws an RLException if the
wrapped function throws an Exception, ``fn``
"""
def wrapped_fn(*args):
try:
return fn(*args)
except Exception as e:
# we specifically catch RLExceptions and print their messages differently
raise RLException("ERROR: %s"%e)
return wrapped_fn
get_ros_root = rospkg.get_ros_root
get_master_uri_env = rle_wrapper(rosgraph.get_master_uri)
get_ros_package_path = rospkg.get_ros_package_path
def remap_localhost_uri(uri, force_localhost=False):
"""
Resolve localhost addresses to an IP address so that
:param uri: XML-RPC URI, ``str``
:param force_localhost: if True, URI is mapped onto the local machine no matter what, ``bool``
"""
hostname, port = rosgraph.network.parse_http_host_and_port(uri)
if force_localhost or hostname == 'localhost':
return rosgraph.network.create_local_xmlrpc_uri(port)
else:
return uri
##################################################################
logfile_dir = os.path.dirname(log_filename)
if not os.path.exists(logfile_dir):
try:
makedirs_with_parent_perms(logfile_dir)
except OSError:
# cannot print to screen because command-line tools with output use this
sys.stderr.write("WARNING: cannot create log directory [%s]. Please set %s to a writable location.\n"%(logfile_dir, ROS_LOG_DIR))
return None
elif os.path.isfile(logfile_dir):
raise LoggingException("Cannot save log files: file [%s] is in the way"%logfile_dir)
if 'ROS_PYTHON_LOG_CONFIG_FILE' in os.environ:
config_file = os.environ['ROS_PYTHON_LOG_CONFIG_FILE']
else:
config_file = os.path.join(get_ros_root(env=env), 'config', 'python_logging.conf')
if not os.path.isfile(config_file):
# logging is considered soft-fail
sys.stderr.write("WARNING: cannot load logging configuration file, logging is disabled\n")
return log_filename
# pass in log_filename as argument to pylogging.conf
os.environ['ROS_LOG_FILENAME'] = log_filename
# #3625: disabling_existing_loggers=False
logging.config.fileConfig(config_file, disable_existing_loggers=False)
return log_filename
def get_cwd(cwd, binary=''):
result = ''
if cwd == 'node':
result = os.path.dirname(binary)
elif cwd == 'cwd':
result = os.getcwd()
elif cwd == 'ros-root':
result = rospkg.get_ros_root()
else:
result = rospkg.get_ros_home()
if not os.path.exists(result):
try:
os.makedirs(result)
except OSError:
# exist_ok=True
pass
return result