Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(lat, lon, alt, heading) = loc.split(",")
swarminit_filepath = os.path.join(find_autotest_dir(), "swarminit.txt")
for path2 in [file_path, swarminit_filepath]:
if os.path.isfile(path2):
with open(path2, 'r') as swd:
next(swd)
for lines in swd:
if len(lines) == 0:
continue
(instance, offset) = lines.split("=")
if ((int)(instance) == (int)(cmd_opts.instance)):
(x, y, z, head) = offset.split(",")
g = mavextra.gps_offset((float)(lat), (float)(lon), (float)(x), (float)(y))
loc = str(g[0])+","+str(g[1])+","+str(alt+z)+","+str(head)
return loc
g = mavextra.gps_newpos((float)(lat), (float)(lon), 90, 20*(int)(cmd_opts.instance))
loc = str(g[0])+","+str(g[1])+","+str(alt)+","+str(heading)
return loc
def find_new_spawn(loc, file_path):
(lat, lon, alt, heading) = loc.split(",")
swarminit_filepath = os.path.join(find_autotest_dir(), "swarminit.txt")
for path2 in [file_path, swarminit_filepath]:
if os.path.isfile(path2):
with open(path2, 'r') as swd:
next(swd)
for lines in swd:
if len(lines) == 0:
continue
(instance, offset) = lines.split("=")
if ((int)(instance) == (int)(cmd_opts.instance)):
(x, y, z, head) = offset.split(",")
g = mavextra.gps_offset((float)(lat), (float)(lon), (float)(x), (float)(y))
loc = str(g[0])+","+str(g[1])+","+str(alt+z)+","+str(head)
return loc
g = mavextra.gps_newpos((float)(lat), (float)(lon), 90, 20*(int)(cmd_opts.instance))
loc = str(g[0])+","+str(g[1])+","+str(alt)+","+str(heading)
return loc
ex = None
try:
self.set_parameter("PLND_ENABLED", 1)
self.fetch_parameters()
self.set_parameter("PLND_TYPE", 4)
self.set_parameter("RNGFND1_TYPE", 1)
self.set_parameter("RNGFND1_MIN_CM", 0)
self.set_parameter("RNGFND1_MAX_CM", 4000)
self.set_parameter("RNGFND1_PIN", 0)
self.set_parameter("RNGFND1_SCALING", 12)
self.set_parameter("SIM_SONAR_SCALE", 12)
start = self.mav.location()
target = start
(target.lat, target.lng) = mavextra.gps_offset(start.lat, start.lng, 4, -4)
self.progress("Setting target to %f %f" % (target.lat, target.lng))
self.set_parameter("SIM_PLD_ENABLE", 1)
self.set_parameter("SIM_PLD_LAT", target.lat)
self.set_parameter("SIM_PLD_LON", target.lng)
self.set_parameter("SIM_PLD_HEIGHT", 0)
self.set_parameter("SIM_PLD_ALT_LMT", 15)
self.set_parameter("SIM_PLD_DIST_LMT", 10)
self.reboot_sitl()
self.progress("Waiting for location")
old_pos = self.mav.location()
self.zero_throttle()
self.takeoff(10, 1800)
self.change_mode("LAND")
print("Can't find status on GPS message")
print(m)
break
if status < 2:
continue
# flash log
lat = m.Lat
lng = getattr(m, 'Lng', None)
if lng is None:
lng = getattr(m, 'Lon', None)
if lng is None:
print("Can't find longitude on GPS message")
print(m)
break
elif type in ['EKF1', 'ANU1']:
pos = mavextra.ekf1_pos(m)
if pos is None:
continue
ekf_counter += 1
if ekf_counter % options.ekf_sample != 0:
continue
(lat, lng) = pos
elif type in ['NKF1']:
pos = mavextra.ekf1_pos(m)
if pos is None:
continue
nkf_counter += 1
if nkf_counter % options.nkf_sample != 0:
continue
(lat, lng) = pos
elif type in ['ANU5']:
(lat, lng) = (m.Alat*1.0e-7, m.Alng*1.0e-7)
if now - last_origin_send > 1 and not args.gps_only:
# send a heartbeat msg
mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS, mavutil.mavlink.MAV_AUTOPILOT_GENERIC, 0, 0, 0)
# send origin at 1Hz
mav.mav.set_gps_global_origin_send(args.target_sysid,
int(origin[0]*1.0e7), int(origin[1]*1.0e7), int(origin[2]*1.0e3),
time_us)
last_origin_send = now
if args.gps_rate > 0 and now_ms - last_gps_send_ms > gps_period_ms:
'''send GPS data at the specified rate, trying to align on the given period'''
if last_gps_pos is None:
last_gps_pos = pos_ned
gps_lat, gps_lon = mavextra.gps_offset(origin[0], origin[1], pos_ned[1], pos_ned[0])
gps_alt = origin[2] - pos_ned[2]
gps_dt = now - last_gps_send
gps_vel = [ (pos_ned[0]-last_gps_pos[0])/gps_dt,
(pos_ned[1]-last_gps_pos[1])/gps_dt,
(pos_ned[2]-last_gps_pos[2])/gps_dt ]
last_gps_pos = pos_ned
gps_week, gps_week_ms = get_gps_time(now)
if args.gps_nsats >= 6:
fix_type = 3
else:
fix_type = 1
mav.mav.gps_input_send(time_us, 0, 0, gps_week_ms, gps_week, fix_type,
int(gps_lat*1.0e7), int(gps_lon*1.0e7), gps_alt,
yaw_change1 = []
yaw_change2 = []
for i in range(len(data)):
(MAG,ATT,BAT) = data[i]
yaw1 = get_yaw(ATT,MAG,BAT,c)
corrected['Yaw'].append(yaw1)
ef1 = expected_field(ATT, yaw1)
cf = correct(MAG, BAT, c)
yaw2 = get_yaw(ATT,MAG,BAT,old_corrections)
ef2 = expected_field(ATT, yaw2)
uncorrected['Yaw'].append(yaw2)
uf = correct(MAG, BAT, old_corrections)
yaw_change1.append(mavextra.wrap_180(yaw1 - yaw2))
yaw_change2.append(mavextra.wrap_180(yaw1 - ATT.Yaw))
for axis in ['x','y','z']:
if not axis in corrected:
corrected[axis] = []
uncorrected[axis] = []
expected1[axis] = []
expected2[axis] = []
corrected[axis].append(getattr(cf, axis))
uncorrected[axis].append(getattr(uf, axis))
expected1[axis].append(getattr(ef1, axis))
expected2[axis].append(getattr(ef2, axis))
x.append(i)
c.show_parms()
fig, axs = pyplot.subplots(3, 1, sharex=True)
while True:
msg = mlog.recv_match(type=['PARM'])
if msg is None:
break
parameters[msg.Name] = msg.Value
mlog.rewind()
# extract MAG data
while True:
msg = mlog.recv_match(type=['GPS',mag_msg,'ATT','CTUN','BARO', 'BAT'], condition=args.condition)
if msg is None:
break
if msg.get_type() == 'GPS' and msg.Status >= 3 and earth_field is None:
earth_field = mavextra.expected_earth_field(msg)
(declination,inclination,intensity) = mavextra.get_mag_field_ef(msg.Lat, msg.Lng)
print("Earth field: %s strength %.0f declination %.1f degrees" % (earth_field, earth_field.length(), declination))
if msg.get_type() == 'ATT':
ATT = msg
if msg.get_type() == 'BAT':
BAT = msg
if msg.get_type() == mag_msg and ATT is not None:
if count % args.reduce == 0:
data.append((msg,ATT,BAT))
count += 1
old_corrections.offsets = Vector3(parameters.get('COMPASS_OFS%s_X' % mag_idx,0.0),
parameters.get('COMPASS_OFS%s_Y' % mag_idx,0.0),
parameters.get('COMPASS_OFS%s_Z' % mag_idx,0.0))
old_corrections.diag = Vector3(parameters.get('COMPASS_DIA%s_X' % mag_idx,1.0),
parameters.get('COMPASS_DIA%s_Y' % mag_idx,1.0),
parameters.get('COMPASS_DIA%s_Z' % mag_idx,1.0))
# get parameters
while True:
msg = mlog.recv_match(type=['PARM'])
if msg is None:
break
parameters[msg.Name] = msg.Value
mlog.rewind()
# extract MAG data
while True:
msg = mlog.recv_match(type=['GPS',mag_msg,'ATT','CTUN','BARO', 'BAT'], condition=args.condition)
if msg is None:
break
if msg.get_type() == 'GPS' and msg.Status >= 3 and earth_field is None:
earth_field = mavextra.expected_earth_field(msg)
(declination,inclination,intensity) = mavextra.get_mag_field_ef(msg.Lat, msg.Lng)
print("Earth field: %s strength %.0f declination %.1f degrees" % (earth_field, earth_field.length(), declination))
if msg.get_type() == 'ATT':
ATT = msg
if msg.get_type() == 'BAT':
BAT = msg
if msg.get_type() == mag_msg and ATT is not None:
if count % args.reduce == 0:
data.append((msg,ATT,BAT))
count += 1
old_corrections.offsets = Vector3(parameters.get('COMPASS_OFS%s_X' % mag_idx,0.0),
parameters.get('COMPASS_OFS%s_Y' % mag_idx,0.0),
parameters.get('COMPASS_OFS%s_Z' % mag_idx,0.0))
old_corrections.diag = Vector3(parameters.get('COMPASS_DIA%s_X' % mag_idx,1.0),
parameters.get('COMPASS_DIA%s_Y' % mag_idx,1.0),
if lng is None:
lng = getattr(m, 'Lon', None)
if lng is None:
print("Can't find longitude on GPS message")
print(m)
break
elif type in ['EKF1', 'ANU1']:
pos = mavextra.ekf1_pos(m)
if pos is None:
continue
ekf_counter += 1
if ekf_counter % options.ekf_sample != 0:
continue
(lat, lng) = pos
elif type in ['NKF1']:
pos = mavextra.ekf1_pos(m)
if pos is None:
continue
nkf_counter += 1
if nkf_counter % options.nkf_sample != 0:
continue
(lat, lng) = pos
elif type in ['ANU5']:
(lat, lng) = (m.Alat*1.0e-7, m.Alng*1.0e-7)
elif type in ['AHR2', 'POS', 'CHEK']:
(lat, lng) = (m.Lat, m.Lng)
elif type == 'AHRS2':
(lat, lng) = (m.lat*1.0e-7, m.lng*1.0e-7)
elif type == 'ORGN':
(lat, lng) = (m.Lat, m.Lng)
elif type == 'SIM':
(lat, lng) = (m.Lat, m.Lng)
def get_yaw(ATT,MAG,BAT,c):
'''calculate heading from raw magnetometer and new offsets'''
mag = correct(MAG, BAT, c)
# go via a DCM matrix to match the APM calculation
dcm_matrix = mavextra.rotation_df(ATT)
cos_pitch_sq = 1.0-(dcm_matrix.c.x*dcm_matrix.c.x)
headY = mag.y * dcm_matrix.c.z - mag.z * dcm_matrix.c.y
headX = mag.x * cos_pitch_sq - dcm_matrix.c.x * (mag.y * dcm_matrix.c.y + mag.z * dcm_matrix.c.z)
global declination
yaw = math.degrees(math.atan2(-headY,headX)) + declination
if yaw < 0:
yaw += 360
return yaw