Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from pymavlink import mavutil
master = mavutil.mavlink_connection("udp::14555", input=False, dialect="array_test")
while True:
master.mav.system_time_send(1,2)
master.mav.array_test_0_send(1, [-3, -2, -1, 0], [1,2,3,4], [5,6,7,8], [9,10,11,12])
master.mav.array_test_1_send([1,2,3,4])
master.mav.array_test_3_send(1, [2,3,4,5])
master.mav.array_test_4_send([1,2,3,4], 5)
master.mav.array_test_5_send("test1", "test2")
master.mav.array_test_6_send(1,2,3, [4,5], [6,7], [8,9], [10,11], [12,13], [14,15], "long value", [1.1, 2.2], [3.3, 4.4])
master.mav.array_test_7_send([1.1, 2.2], [3.3, 4.4],
[4,5], [6,7], [8,9], [10,11], [12,13], [14,15], "long value")
master.mav.array_test_8_send(1, [2.2, 3.3], [14,15])
time.sleep(1)
master.close()
def get_mavlink_connection_going(self):
# get a mavlink connection going
connection_string = self.autotest_connection_string_to_mavproxy()
try:
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True,
source_component=250)
except Exception as msg:
self.progress("Failed to start mavlink connection on %s: %s" %
(connection_string, msg,))
raise
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
def __init__(self, fc_device, local_port):
self._port = str(local_port)
if self._port in UdpPass.consumed_ports:
raise Exception('port already used!')
UdpPass.consumed_ports.append(self._port)
self._ip = UdpPass.get_ip_address()
self._connection_string = 'udp:' + self._ip + ':' + self._port
self._mavudp = mavutil.mavlink_connection(self._connection_string)
# Create serial device
try:
self._mavserial = mavutil.mavlink_connection(fc_device,baud=921600)
except serial.serialutil.SerialException :
print("\n\tSerial device " + fc_device +" not available\n")
exit(1)
self._data = {}
print(self._connection_string)
#print(self._mavserial.baud)
yaw = math.radians(mavextra.wrap_360(math.degrees(yaw)))
self.pos = pos_ned
self.att = [math.degrees(roll), math.degrees(pitch), math.degrees(yaw)]
self.frame_count += 1
yaw_cd = int(mavextra.wrap_360(math.degrees(yaw)) * 100)
if yaw_cd == 0:
# the yaw extension to GPS_INPUT uses 0 as no yaw support
yaw_cd = 36000
time_us = int(now * 1.0e6)
if now - last_origin_send > 1 and self.vicon_settings.vision_rate > 0:
# send a heartbeat msg
mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS, mavutil.mavlink.MAV_AUTOPILOT_GENERIC, 0, 0, 0)
# send origin at 1Hz
mav.mav.set_gps_global_origin_send(self.target_system,
int(self.vicon_settings.origin_lat*1.0e7),
int(self.vicon_settings.origin_lon*1.0e7),
int(self.vicon_settings.origin_alt*1.0e3),
time_us)
last_origin_send = now
if self.vicon_settings.gps_rate > 0 and now_ms - last_gps_send_ms > gps_period_ms:
'''send GPS data at the specified rate, trying to align on the given period'''
if last_gps_pos is None:
last_gps_pos = pos_ned
gps_lat, gps_lon = mavextra.gps_offset(self.vicon_settings.origin_lat,
self.vicon_settings.origin_lon,
pos_ned.y, pos_ned.x)
logfile = mavproxy.match.group(1)
print 'Saving log %s' % (logfile,)
# the received parameters can come before or after the ready to fly message
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
mavproxy.send('module load mmap\n')
util.expect_setup_callback(mavproxy, common.expect_callback)
common.expect_list_clear()
common.expect_list_extend([sim, sil, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception, msg:
error("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
mav.message_hooks.append(common.message_hook)
mav.idle_hooks.append(common.idle_hook)
failed = False
e = 'None'
try:
mav.wait_heartbeat()
arducopter.setup_rc(mavproxy)
print 'Calibrating level...'
if not arducopter.calibrate_level(mavproxy, mav):
error('Failed to calibrate level.')
failed = True
type = m.get_type()
if type == 'MISSION_ITEM':
try:
while m.seq > wp.count():
print("Adding dummy WP %u" % wp.count())
wp.set(m, wp.count())
wp.set(m, m.seq)
except Exception:
pass
continue
if type == 'CMD':
m = mavutil.mavlink.MAVLink_mission_item_message(0,
0,
m.CNum,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
m.CId,
0, 1,
m.Prm1, m.Prm2, m.Prm3, m.Prm4,
m.Lat, m.Lng, m.Alt)
try:
while m.seq > wp.count():
print("Adding dummy WP %u" % wp.count())
wp.set(m, wp.count())
wp.set(m, m.seq)
except Exception:
pass
continue
if not mlog.check_condition(options.condition):
continue
if options.mode is not None and mlog.flightmode.lower() != options.mode.lower():
continue
'''in-memory mavlink log'''
from pymavlink import mavutil
class mavmemlog(mavutil.mavfile):
'''a MAVLink log in memory. This allows loading a log into
memory to make it easier to do multiple sweeps over a log'''
def __init__(self, mav, progress_callback=None):
mavutil.mavfile.__init__(self, None, 'memlog')
self._msgs = []
self._count = 0
self.rewind()
self._flightmodes = []
last_flightmode = None
last_timestamp = None
last_pct = 0
while True:
m = mav.recv_msg()
if m is None:
break
def send_fence(self):
'''send fence points from fenceloader. Taken from fence module'''
# must disable geo-fencing when loading
self.fenceloader.target_system = self.target_system
self.fenceloader.target_component = self.target_component
self.fenceloader.reindex()
action = self.get_mav_param('FENCE_ACTION', mavutil.mavlink.FENCE_ACTION_NONE)
self.param_set('FENCE_ACTION', mavutil.mavlink.FENCE_ACTION_NONE, 3)
self.param_set('FENCE_TOTAL', self.fenceloader.count(), 3)
for i in range(self.fenceloader.count()):
p = self.fenceloader.point(i)
self.master.mav.send(p)
p2 = self.fetch_fence_point(i)
if p2 is None:
self.param_set('FENCE_ACTION', action, 3)
return False
if (p.idx != p2.idx or
abs(p.lat - p2.lat) >= 0.00003 or
abs(p.lng - p2.lng) >= 0.00003):
print("Failed to send fence point %u" % i)
self.param_set('FENCE_ACTION', action, 3)
return False
self.param_set('FENCE_ACTION', action, 3)
return True
def reset(self):
self.out_queue = Queue()
if hasattr(self.master, 'reset'):
self.master.reset()
else:
try:
self.master.close()
except:
pass
self.master = mavutil.mavlink_connection(self.master.address)
def handle_px4_param_value(self, m):
'''special handling for the px4 style of PARAM_VALUE'''
if m.param_type == mavutil.mavlink.MAV_PARAM_TYPE_REAL32:
# already right type
return m.param_value
is_px4_params = False
if m.get_srcComponent() in [mavutil.mavlink.MAV_COMP_ID_UDP_BRIDGE]:
# ESP8266 uses PX4 style parameters
is_px4_params = True
sysid = m.get_srcSystem()
if self.autopilot_type_by_sysid.get(sysid,-1) in [mavutil.mavlink.MAV_AUTOPILOT_PX4]:
is_px4_params = True
if not is_px4_params:
return m.param_value
# try to extract px4 param value
value = m.param_value
try:
v = struct.pack(">f", value)
except Exception:
return value
if m.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8:
value, = struct.unpack(">B", v[3:])
elif m.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8:
value, = struct.unpack(">b", v[3:])
elif m.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16:
value, = struct.unpack(">H", v[2:])
elif m.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16: