Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# open any mavlink output ports
for port in opts.output:
mpstate.mav_outputs.append(mavutil.mavlink_connection(port, baud=int(opts.baudrate), input=False))
if opts.sitl:
mpstate.sitl_output = mavutil.mavudp(opts.sitl, input=False)
mpstate.settings.streamrate = opts.streamrate
mpstate.settings.streamrate2 = opts.streamrate
if opts.state_basedir is not None:
mpstate.settings.state_basedir = opts.state_basedir
msg_period = mavutil.periodic_event(1.0/15)
heartbeat_period = mavutil.periodic_event(1)
heartbeat_check_period = mavutil.periodic_event(0.33)
mpstate.input_queue = Queue.Queue()
mpstate.input_count = 0
mpstate.empty_input_count = 0
if opts.setup:
mpstate.rl.set_prompt("")
# call this early so that logdir is setup based on --aircraft
(mpstate.status.logdir, logpath_telem, logpath_telem_raw) = log_paths()
for module in opts.load_module:
modlist = module.split(',')
for mod in modlist:
process_stdin('module load %s' % (mod))
context = zmq.Context()
socket_sub = context.socket(zmq.SUB)
socket_sub.connect('tcp://%s:%d'%config.zmq_pub_unreal_proxy)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_state)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_drone_rgb_camera%drone_num)
mav1 = mavutil.mavlink_connection('udp:127.0.0.1:%d'%(14551+0*10))
print("Waiting for HEARTBEAT")
mav1.wait_heartbeat()
print("Heartbeat from APM (system %u component %u)" % (mav1.target_system, mav1.target_component))
event = mavutil.periodic_event(0.3)
event1hz = mavutil.periodic_event(1)
freq=30
pub_position_event = mavutil.periodic_event(freq)
def set_rcs(rc1, rc2, rc3, rc4):
global mav1
values = [ 1500 ] * 8
values[0] = rc1
values[1] = rc2
values[2] = rc3
values[3] = rc4
mav1.mav.rc_channels_override_send(mav1.target_system, mav1.target_component, *values)
def get_position_struct(mav):
if not 'VFR_HUD' in mav1.messages:
return None
d={}
def __init__(self):
self.is_armed = False
self.last_fence_fetch = 0
self.last_mission_fetch = 0
self.last_rally_fetch = 0
self.done_heartbeat_check = 0
# an altitude should always be within a few metres of when disarmed:
self.disarmed_alt = 584
self.rate_period = mavutil.periodic_event(1.0/15)
self.done_map_menu = False
def __init__(self, mpstate):
super(RCModule, self).__init__(mpstate, "rc", "rc command handling", public = True)
self.count = 18
self.override = [ 0 ] * self.count
self.last_override = [ 0 ] * self.count
self.override_counter = 0
x = "|".join(str(x) for x in range(1, (self.count+1)))
self.add_command('rc', self.cmd_rc, "RC input control", ['<%s|all>' % x])
self.add_command('switch', self.cmd_switch, "flight mode switch control", ['<0|1|2|3|4|5|6>'])
self.rc_settings = mp_settings.MPSettings(
[('override_hz', float, 5.0)])
if self.sitl_output:
self.rc_settings.override_hz = 20.0
self.add_completion_function('(RCSETTING)',
self.rc_settings.completion)
self.override_period = mavutil.periodic_event(self.rc_settings.override_hz)
context = zmq.Context()
socket_sub = context.socket(zmq.SUB)
socket_sub.connect('tcp://%s:%d'%config.zmq_pub_unreal_proxy)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_state)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_drone_rgb_camera%drone_num)
mav1 = mavutil.mavlink_connection('udp:127.0.0.1:%d'%(14551+drone_num*10))
print("Waiting for HEARTBEAT")
mav1.wait_heartbeat()
print("Heartbeat from APM (system %u component %u)" % (mav1.target_system, mav1.target_component))
event = mavutil.periodic_event(0.3)
event1hz = mavutil.periodic_event(1)
freq=30
pub_position_event = mavutil.periodic_event(freq)
def set_rcs(rc1, rc2, rc3, rc4):
global mav1
values = [ 1500 ] * 8
values[0] = rc1
values[1] = rc2
values[2] = rc3
values[3] = rc4
mav1.mav.rc_channels_override_send(mav1.target_system, mav1.target_component, *values)
def get_position_struct(mav):
if not 'VFR_HUD' in mav1.messages:
return None
d={}
socket_sub = context.socket(zmq.SUB)
socket_pub.bind("tcp://*:%d" % config.zmq_pub_drone_main[1] )
socket_sub.connect('tcp://%s:%d'%config.zmq_pub_unreal_proxy)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_state)
mav1 = mavutil.mavlink_connection('udp:127.0.0.1:14551')
print("Waiting for HEARTBEAT")
mav1.wait_heartbeat()
print("Heartbeat from APM (system %u component %u)" % (mav1.target_system, mav1.target_system))
event = mavutil.periodic_event(0.3)
freq=30
pub_position_event = mavutil.periodic_event(freq)
def set_rcs(rc1, rc2, rc3, rc4):
global mav1
values = [ 1500 ] * 8
values[0] = rc1
values[1] = rc2
values[2] = rc3
values[3] = rc4
mav1.mav.rc_channels_override_send(mav1.target_system, mav1.target_component, *values)
def get_position_struct(mav):
d={}
d['posz']=mav1.messages['VFR_HUD'].alt
sm=mav1.messages['SIMSTATE']
home=mav1.messages['HOME']
lng_factor=np.cos(np.radians(sm.lng/1.0e7))
self.per_cell = 0
self.servo_voltage = -1
self.high_servo_voltage = -1
self.last_servo_warn_time = 0
self.last_vcc_warn_time = 0
self.settings.append(
MPSetting('battwarn', int, 1, 'Battery Warning Time', tab='Battery'))
self.settings.append(
MPSetting('batwarncell', float, 3.7, 'Battery cell Warning level'))
self.settings.append(
MPSetting('servowarn', float, 4.3, 'Servo voltage warning level'))
self.settings.append(
MPSetting('vccwarn', float, 4.3, 'Vcc voltage warning level'))
self.settings.append(MPSetting('numcells', int, 0, range=(0,50), increment=1))
self.battery_period = mavutil.periodic_event(5)
topic_postition=config.topic_sitl_position_report
context = zmq.Context()
socket_sub = context.socket(zmq.SUB)
socket_sub.connect('tcp://%s:%d'%config.zmq_pub_unreal_proxy)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_state)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_drone_rgb_camera%drone_num)
mav1 = mavutil.mavlink_connection('udp:127.0.0.1:%d'%(14551+0*10))
print("Waiting for HEARTBEAT")
mav1.wait_heartbeat()
print("Heartbeat from APM (system %u component %u)" % (mav1.target_system, mav1.target_component))
event = mavutil.periodic_event(0.3)
event1hz = mavutil.periodic_event(1)
freq=30
pub_position_event = mavutil.periodic_event(freq)
def set_rcs(rc1, rc2, rc3, rc4):
global mav1
values = [ 1500 ] * 8
values[0] = rc1
values[1] = rc2
values[2] = rc3
values[3] = rc4
mav1.mav.rc_channels_override_send(mav1.target_system, mav1.target_component, *values)
def get_position_struct(mav):
if not 'VFR_HUD' in mav1.messages:
return None
def __init__(self, mpstate):
super(WPModule, self).__init__(mpstate, "wp", "waypoint handling", public = True)
self.wp_op = None
self.wp_requested = {}
self.wp_received = {}
self.wp_save_filename = None
self.wploader_by_sysid = {}
self.loading_waypoints = False
self.loading_waypoint_lasttime = time.time()
self.last_waypoint = 0
self.wp_period = mavutil.periodic_event(0.5)
self.undo_wp = None
self.undo_type = None
self.undo_wp_idx = -1
self.wploader.expected_count = 0
self.add_command('wp', self.cmd_wp, 'waypoint management',
["",
" (FILENAME)"])
if self.continue_mode and self.logdir is not None:
waytxt = os.path.join(mpstate.status.logdir, 'way.txt')
if os.path.exists(waytxt):
self.wploader.load(waytxt)
print("Loaded waypoints from %s" % waytxt)
self.menu_added_console = False
self.menu_added_map = False
socket_pub = context.socket(zmq.PUB)
socket_sub = context.socket(zmq.SUB)
socket_pub.bind("tcp://*:%d" % (config.zmq_pub_drone_main[1]+drone_num))
socket_sub.connect('tcp://%s:%d'%config.zmq_pub_unreal_proxy)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_state)
socket_sub.setsockopt(zmq.SUBSCRIBE,config.topic_unreal_drone_rgb_camera%drone_num)
mav1 = mavutil.mavlink_connection('udp:127.0.0.1:%d'%(14551+drone_num*10))
print("Waiting for HEARTBEAT")
mav1.wait_heartbeat()
print("Heartbeat from APM (system %u component %u)" % (mav1.target_system, mav1.target_system))
event = mavutil.periodic_event(0.3)
freq=30
pub_position_event = mavutil.periodic_event(freq)
def set_rcs(rc1, rc2, rc3, rc4):
global mav1
values = [ 1500 ] * 8
values[0] = rc1
values[1] = rc2
values[2] = rc3
values[3] = rc4
mav1.mav.rc_channels_override_send(mav1.target_system, mav1.target_component, *values)
def get_position_struct(mav):
d={}
d['posz']=mav1.messages['VFR_HUD'].alt
sm=mav1.messages['SIMSTATE']