Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sitl = util.start_SITL(binary, model='rover', home=home, speedup=10, gdb=False)
mavproxy = util.start_MAVProxy_SITL('APMrover2', options=options)
mavproxy.expect('Telemetry log: (\S+)')
logfile = mavproxy.match.group(1)
print("LOGFILE %s" % logfile)
mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(mavproxy, expect_callback)
expect_list_clear()
expect_list_extend([sitl, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg:
print("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
mav.message_hooks.append(message_hook)
mav.idle_hooks.append(idle_hook)
try:
print("Waiting for a heartbeat with mavlink protocol %s" % mav.WIRE_PROTOCOL_VERSION)
mav.wait_heartbeat()
print("Setting up RC parameters")
setup_rc(mavproxy)
print("Waiting for GPS fix")
mav.wait_gps_fix()
homeloc = mav.location()
def magfit(logfile):
'''find best magnetometer offset fit to a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename)
data = []
throttle = 0
# now gather all the data
while True:
m = mlog.recv_match(condition=args.condition)
if m is None:
break
if m.get_type() == "MAG":
mag = Vector3(m.MagX, m.MagY, m.MagZ)
data.append((mag, throttle))
#if m.get_type() == "RCOU":
# throttle = math.sqrt(m.C1/500.0)
if m.get_type() == "CTUN":
throttle = m.ThO
def magfit(logfile):
'''find best magnetometer offset fit to a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename, notimestamps=args.notimestamps)
data = []
last_t = 0
offsets = Vector3(0,0,0)
# now gather all the data
while True:
m = mlog.recv_match(condition=args.condition)
if m is None:
break
if m.get_type() == "SENSOR_OFFSETS":
# update current offsets
offsets = Vector3(m.mag_ofs_x, m.mag_ofs_y, m.mag_ofs_z)
if m.get_type() == "RAW_IMU":
mag = Vector3(m.xmag, m.ymag, m.zmag)
def magfit(logfile):
'''find best magnetometer rotation fit to a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename, notimestamps=args.notimestamps)
# generate 90 degree rotations
rotations = generate_rotations()
print("Generated %u rotations" % len(rotations))
count = 0
total_error = [0]*len(rotations)
attitude = None
gps = None
# now gather all the data
while True:
m = mlog.recv_match()
if m is None:
break
if m.get_type() == "ATTITUDE":
from pymavlink import mavlinkv10 as mavlink
from optparse import OptionParser
parser = OptionParser("mavfilter.py srcport dstport")
(opts, args) = parser.parse_args()
if len(args) < 1:
print("Usage: mavfilter.py srcport dstport ")
sys.exit(1)
srcport = args[0]
dstport = args[1]
# gee python string apend is stupid, whatever. "tcp:localhost:" += srcport gives: SyntaxError: invalid syntax
msrc = mavutil.mavlink_connection("".join(('tcp:localhost:',srcport)), planner_format=False,
notimestamps=True,
robust_parsing=True)
mdst = mavutil.mavlink_connection("".join(('tcp:localhost:',dstport)), planner_format=False,
notimestamps=True,
robust_parsing=True)
# simple basic byte pass through, no logging or viewing of packets, or analysis etc
#while True:
# # L -> R
# m = msrc.recv();
# mdst.write(m);
# # R -> L
# m2 = mdst.recv();
# msrc.write(m2);
def IMUCheckFail(filename):
try:
mlog = mavutil.mavlink_connection(filename)
except Exception:
return False
accel1 = None
accel2 = None
gyro1 = None
gyro2 = None
t1 = 0
t2 = 0
ecount_accel = [0]*3
ecount_gyro = [0]*3
athreshold = 3.0
gthreshold = 30.0
count_threshold = 100
imu1_count = 0
imu2_count = 0
def flight_modes(logfile):
'''show flight modes for a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename)
mode = ""
previous_mode = ""
mode_start_timestamp = -1
time_in_mode = {}
previous_percent = -1
seconds_per_percent = -1
filesize = os.path.getsize(filename)
while True:
m = mlog.recv_match(type=['SYS_STATUS','HEARTBEAT','MODE'],
condition='MAV.flightmode!="%s"' % mlog.flightmode)
if m is None:
break
print('%s MAV.flightmode=%-12s (MAV.timestamp=%u %u%%)' % (
def cmd_output_add(self, args):
'''add new output'''
device = args[0]
print("Adding output %s" % device)
try:
conn = mavutil.mavlink_connection(device, input=False, source_system=self.settings.source_system)
conn.mav.srcComponent = self.settings.source_component
except Exception:
print("Failed to connect to %s" % device)
return
self.mpstate.mav_outputs.append(conn)
try:
mp_util.child_fd_list_add(conn.port.fileno())
except Exception:
pass
def get(self, conn):
if conn not in self._manager:
self._manager[conn] = mavlink_connection(conn)
return self._manager[conn]
thread for this thread to survive.
Args:
device: an address to the drone (e.g. "tcp:127.0.0.1:5760")
(see mavutil mavlink connection for valid options)
threaded: whether or not to run the message read loop on a
separate thread (default: {False})
"""
# call the superclass constructor
super().__init__(threaded)
# create the connection
if device is not "":
self._master = mavutil.mavlink_connection(device)
# set up any of the threading, as needed
if self._threaded:
self._read_handle = threading.Thread(target=self.dispatch_loop)
self._read_handle.daemon = True
else:
self._read_handle = None
# management
self._running = False
self._target_system = 1
self._target_component = 1
self._timeout = 5 # seconds to wait of no messages before termination