Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# setup output to SITL sim
sim_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sim_out.connect(interpret_address(opts.simout))
sim_out.setblocking(0)
# setup possible output to FlightGear for display
fg_out = None
if opts.fgout:
fg_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
fg_out.connect(interpret_address(opts.fgout))
# setup wind generator
wind = util.Wind(opts.wind)
fdm = fgFDM.fgFDM()
jsb_console.send('info\n')
jsb_console.send('resume\n')
jsb.expect(["trim computation time", "Trim Results"])
time.sleep(1.5)
jsb_console.send('step\n')
jsb_console.logfile = None
print("Simulator ready to fly")
def main_loop():
"""Run main loop."""
tnow = time.time()
last_report = tnow
last_wind_update = tnow
(lat, lon, alt, heading) = loc.split(",")
swarminit_filepath = os.path.join(find_autotest_dir(), "swarminit.txt")
for path2 in [file_path, swarminit_filepath]:
if os.path.isfile(path2):
with open(path2, 'r') as swd:
next(swd)
for lines in swd:
if len(lines) == 0:
continue
(instance, offset) = lines.split("=")
if ((int)(instance) == (int)(cmd_opts.instance)):
(x, y, z, head) = offset.split(",")
g = mavextra.gps_offset((float)(lat), (float)(lon), (float)(x), (float)(y))
loc = str(g[0])+","+str(g[1])+","+str(alt+z)+","+str(head)
return loc
g = mavextra.gps_newpos((float)(lat), (float)(lon), 90, 20*(int)(cmd_opts.instance))
loc = str(g[0])+","+str(g[1])+","+str(alt)+","+str(heading)
return loc
def find_new_spawn(loc, file_path):
(lat, lon, alt, heading) = loc.split(",")
swarminit_filepath = os.path.join(find_autotest_dir(), "swarminit.txt")
for path2 in [file_path, swarminit_filepath]:
if os.path.isfile(path2):
with open(path2, 'r') as swd:
next(swd)
for lines in swd:
if len(lines) == 0:
continue
(instance, offset) = lines.split("=")
if ((int)(instance) == (int)(cmd_opts.instance)):
(x, y, z, head) = offset.split(",")
g = mavextra.gps_offset((float)(lat), (float)(lon), (float)(x), (float)(y))
loc = str(g[0])+","+str(g[1])+","+str(alt+z)+","+str(head)
return loc
g = mavextra.gps_newpos((float)(lat), (float)(lon), 90, 20*(int)(cmd_opts.instance))
loc = str(g[0])+","+str(g[1])+","+str(alt)+","+str(heading)
return loc
loc.alt, # altitude
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get MISSION_ACK")
if m.type != mavutil.mavlink.MAV_MISSION_ERROR:
raise NotAchievedException("Did not get appropriate error")
self.start_subtest("Enter guided and flying somewhere constant")
self.change_mode("GUIDED")
self.mav.mav.mission_item_int_send(
target_system,
target_component,
0, # seq
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
2, # current - guided-mode request
0, # autocontinue
0, # p1
0, # p2
0, # p3
0, # p4
int(loc.lat *1e7), # latitude
int(loc.lng *1e7), # longitude
desired_relative_alt, # altitude
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get MISSION_ACK")
if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED:
raise NotAchievedException("Did not get accepted response")
self.wait_location(loc, accuracy=100) # based on loiter radius
raise NotAchievedException("Breached fence unexpectedly (%u)" %
(m.breach_status))
self.mavproxy.send('fence disable\n')
self.mavproxy.expect("fence disabled")
self.assert_fence_sys_status(True, False, True)
self.set_parameter("FENCE_ACTION", mavutil.mavlink.FENCE_ACTION_NONE)
self.assert_fence_sys_status(False, False, True)
self.set_parameter("FENCE_ACTION", mavutil.mavlink.FENCE_ACTION_RTL)
self.assert_fence_sys_status(True, False, True)
self.mavproxy.send("fence clear\n")
self.mavproxy.expect("fence removed")
if self.get_parameter("FENCE_TOTAL") != 0:
raise NotAchievedException("Expected zero points remaining")
self.assert_fence_sys_status(False, False, True)
self.progress("Trying to enable fence with no points")
self.do_fence_enable(want_result=mavutil.mavlink.MAV_RESULT_FAILED)
# test a rather unfortunate behaviour:
self.progress("Killing a live fence with fence-clear")
self.load_fence("CMAC-fence.txt")
self.set_parameter("FENCE_ACTION", mavutil.mavlink.FENCE_ACTION_RTL)
self.do_fence_enable()
self.assert_fence_sys_status(True, True, True)
self.mavproxy.send("fence clear\n")
self.mavproxy.expect("fence removed")
if self.get_parameter("FENCE_TOTAL") != 0:
raise NotAchievedException("Expected zero points remaining")
self.assert_fence_sys_status(False, False, True)
except Exception as e:
self.progress("Exception caught:")
self.progress(self.get_exception_stacktrace(e))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from pymavlink import mavutil
master = mavutil.mavlink_connection("udp::14555", input=False, dialect="array_test")
while True:
master.mav.system_time_send(1,2)
master.mav.array_test_0_send(1, [-3, -2, -1, 0], [1,2,3,4], [5,6,7,8], [9,10,11,12])
master.mav.array_test_1_send([1,2,3,4])
master.mav.array_test_3_send(1, [2,3,4,5])
master.mav.array_test_4_send([1,2,3,4], 5)
master.mav.array_test_5_send("test1", "test2")
master.mav.array_test_6_send(1,2,3, [4,5], [6,7], [8,9], [10,11], [12,13], [14,15], "long value", [1.1, 2.2], [3.3, 4.4])
master.mav.array_test_7_send([1.1, 2.2], [3.3, 4.4],
[4,5], [6,7], [8,9], [10,11], [12,13], [14,15], "long value")
master.mav.array_test_8_send(1, [2.2, 3.3], [14,15])
time.sleep(1)
master.close()
def get_mavlink_connection_going(self):
# get a mavlink connection going
connection_string = self.autotest_connection_string_to_mavproxy()
try:
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True,
source_component=250)
except Exception as msg:
self.progress("Failed to start mavlink connection on %s: %s" %
(connection_string, msg,))
raise
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
sitl = util.start_SITL(binary, model='rover', home=home, speedup=10, gdb=False)
mavproxy = util.start_MAVProxy_SITL('APMrover2', options=options)
mavproxy.expect('Telemetry log: (\S+)')
logfile = mavproxy.match.group(1)
print("LOGFILE %s" % logfile)
mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(mavproxy, expect_callback)
expect_list_clear()
expect_list_extend([sitl, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg:
print("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
mav.message_hooks.append(message_hook)
mav.idle_hooks.append(idle_hook)
try:
print("Waiting for a heartbeat with mavlink protocol %s" % mav.WIRE_PROTOCOL_VERSION)
mav.wait_heartbeat()
print("Setting up RC parameters")
setup_rc(mavproxy)
print("Waiting for GPS fix")
mav.wait_gps_fix()
homeloc = mav.location()
q_euler = Quaternion(q_test.euler)
assert(np.allclose(q_test.euler, euler) or
np.allclose(q_test.q, q_euler.q))
# construct q from a quaternion
q_test = Quaternion(q)
np.testing.assert_almost_equal(q_test.q, q)
q_test = Quaternion(q)
assert q_test.dcm.close(dcm)
q_test = Quaternion(q)
q_euler = Quaternion(q_test.euler)
assert(np.allclose(q_test.euler, euler) or
np.allclose(q_test.q, q_euler.q))
# # construct q from a euler angles
q_test = Quaternion(euler)
np.testing.assert_almost_equal(q_test.q, q)
q_test = Quaternion(euler)
assert q_test.dcm.close(dcm)
q_test = Quaternion(euler)
q_euler = Quaternion(q_test.euler)
assert(np.allclose(q_test.euler, euler) or
np.allclose(q_test.q, q_euler.q))
# # construct q from dcm (Matrix3 instance)
q_test = Quaternion(dcm)
np.testing.assert_almost_equal(q_test.q, q)
q_test = Quaternion(dcm)
assert q_test.dcm.close(dcm)
q_test = Quaternion(dcm)
q_euler = Quaternion(q_test.euler)
assert(np.allclose(q_test.euler, euler) or
def _helper_test_constructor(self, q, euler, dcm):
"""
Helper function for constructor test
Calls constructor for the quaternion from q euler and dcm and checks
if the resulting converions are equivalent to the arguments.
The test for the euler angles is weak as the solution is not unique
:param q: quaternion 4x1, [w, x, y, z]
:param euler: Vector3(roll, pitch, yaw), needs to be equivalent to q
:param q: Matrix3, needs to be equivalent to q
"""
# construct q from a Quaternion
quaternion_instance = Quaternion(q)
q_test = Quaternion(quaternion_instance)
np.testing.assert_almost_equal(q_test.q, q)
q_test = Quaternion(quaternion_instance)
assert q_test.dcm.close(dcm)
q_test = Quaternion(quaternion_instance)
q_euler = Quaternion(q_test.euler)
assert(np.allclose(q_test.euler, euler) or
np.allclose(q_test.q, q_euler.q))
# construct q from a QuaternionBase
quaternion_instance = QuaternionBase(q)
q_test = Quaternion(quaternion_instance)
np.testing.assert_almost_equal(q_test.q, q)
q_test = Quaternion(quaternion_instance)
assert q_test.dcm.close(dcm)
q_test = Quaternion(quaternion_instance)
q_euler = Quaternion(q_test.euler)