Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self, initial_vrep_port):
self.simulation_id = vrep.simxStart('127.0.0.1', initial_vrep_port, True, False, 5000, 5)
while not rospy.is_shutdown():
# We loop as fast as possible, in any case V-Rep can't stream more than 20Hz
# We're surely loosing a bunch of ms here...
vrep.simxGetPingTime(self.simulation_id) # Just to force V-Rep updating its simulation time
vrep_time = vrep.simxGetLastCmdTime(self.simulation_id)
ros_time = rospy.Time.from_sec(vrep_time/1000.)
self.clock_pub.publish(Clock(clock=ros_time))
def start_simulation(self):
vrep.simxStartSimulation(self.simulation_id, vrep.simx_opmode_oneshot)
def update(self):
for obj in self._objects:
if 'handle' in self._objects[obj]:
error, pos = vrep.simxGetObjectPosition(self.simulation_id, self._objects[obj]['handle'], self.base,
vrep.simx_opmode_buffer if self._objects[obj]['initialized'] else vrep.simx_opmode_streaming)
if error == 0:
self._objects[obj]["position"] = pos
self._objects[obj]['initialized'] = True
def __init__(self, objects, simulation_id=0):
self._objects = {}
self.simulation_id = simulation_id
for obj in objects:
self._objects[obj] = {'initialized': False}
error, handle = vrep.simxGetObjectHandle(self.simulation_id, obj, vrep.simx_opmode_blocking)
if error == 0:
self._objects[obj]["handle"] = handle
self.base = -1 # Will return poses in world frame
self.update()
def stop_simulation(self):
vrep.simxStopSimulation(self.simulation_id, vrep.simx_opmode_oneshot)
def update(self):
for joint in self._joints:
if 'handle' in self._joints[joint]:
error, pos = vrep.simxGetJointPosition(self.simulation_id, self._joints[joint]['handle'],
vrep.simx_opmode_buffer if self._joints[joint]['initialized'] else vrep.simx_opmode_streaming)
if error == 0:
self._joints[joint]["position"] = pos
color = self.conversions.ball_to_color(ball_state)
self.light_pub.publish(UInt8(color))
sound = self.conversions.ball_to_sound(ball_state)
self.sound_pub.publish(Float32(sound))
distance = norm(array(objects[self.ball_name]['position']) - array(objects[self.arena_name]['position']))
if distance > 0.25:
self.reset_ball()
self.rate.sleep()
finally:
self.stop_simulation()
sleep(0.5)
vrep.simxFinish(self.simulation_id)
import os
import time
import ctypes
from threading import Lock
from .remoteApiBindings import vrep as remote_api
from ..robot.io import AbstractIO
vrep_error = {
remote_api.simx_return_ok: 'Ok',
remote_api.simx_return_novalue_flag: 'No value',
remote_api.simx_return_timeout_flag: 'Timeout',
remote_api.simx_return_illegal_opmode_flag: 'Opmode error',
remote_api.simx_return_remote_error_flag: 'Remote error',
remote_api.simx_return_split_progress_flag: 'Progress error',
remote_api.simx_return_local_error_flag: 'Local error',
remote_api.simx_return_initialize_error_flag: 'Init error'
}
vrep_mode = {
'normal': remote_api.simx_opmode_oneshot_wait,
'streaming': remote_api.simx_opmode_streaming,
'sending': remote_api.simx_opmode_oneshot,
}
def __init__(self):
self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1)
self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1)
self.ball_pub = rospy.Publisher('environment/ball', CircularState, queue_size=1)
self.light_pub = rospy.Publisher('environment/light', UInt8, queue_size=1)
self.sound_pub = rospy.Publisher('environment/sound', Float32, queue_size=1)
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.ergo_params = json.load(f)
with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f:
self.env_params = json.load(f)
self.rate = rospy.Rate(self.ergo_params['publish_rate'])
vrep_port = rospy.get_param('vrep/environment_port', 29997)
self.simulation_id = vrep.simxStart('127.0.0.1', vrep_port, True, False, 5000, 5)
# Object names in V-Rep
self.joystick_left_joints = ['Joystick_2_Axis_2', 'Joystick_2_Axis_1']
self.joystick_right_joints = ['Joystick_1_Axis_2', 'Joystick_1_Axis_1']
self.ball_name = 'TennisBall'
self.arena_name = 'Arena'
if self.ergo_params["control_joystick_id"] != 2:
useless_joy = self.joystick_left_joints
self.joystick_left_joints = self.joystick_right_joints
self.joystick_right_joints = useless_joy
self.joints = JointTracker(self.joystick_left_joints + self.joystick_right_joints, self.simulation_id)
self.objects = ObjectTracker([self.ball_name, self.arena_name], self.simulation_id)
self.conversions = EnvironmentConversions()