Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
flightmode = None
while True:
m = mlog.recv_match()
if m is None:
break
if args.link is not None and m._link != args.link:
continue
mtype = m.get_type()
if mtype in messages:
if older_message(m, messages[mtype]):
continue
# we don't use mlog.flightmode as that can be wrong if we are extracting a single link
if mtype == 'HEARTBEAT' and m.get_srcComponent() != mavutil.mavlink.MAV_COMP_ID_GIMBAL and m.type != mavutil.mavlink.MAV_TYPE_GCS:
flightmode = mavutil.mode_string_v10(m).upper()
if mtype == 'MODE':
flightmode = mlog.flightmode
if (isbin or islog) and m.get_type() in ["FMT", "PARM", "CMD"]:
file_header += m.get_msgbuf()
if (isbin or islog) and m.get_type() == 'MSG' and m.Message.startswith("Ardu"):
file_header += m.get_msgbuf()
if m.get_type() in ['PARAM_VALUE','MISSION_ITEM','MISSION_ITEM_INT']:
timestamp = getattr(m, '_timestamp', None)
file_header += struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf()
if not mavutil.evaluate_condition(args.condition, mlog.messages):
continue
if flightmode in modes:
def component_type_string(self, hb):
# note that we rely on vehicle_type_string for basic vehicle types
if hb.type == mavutil.mavlink.MAV_TYPE_GCS:
return "GCS"
elif hb.type == mavutil.mavlink.MAV_TYPE_GIMBAL:
return "Gimbal"
elif hb.type == mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER:
return "CC"
elif hb.type == mavutil.mavlink.MAV_TYPE_GENERIC:
return "Generic"
return self.vehicle_type_string(hb)
def listener(_):
# Send 1 heartbeat per second
if monotonic.monotonic() - self._heartbeat_lastsent > 1:
self._master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
self._heartbeat_lastsent = monotonic.monotonic()
# Timeouts.
if self._heartbeat_started:
if self._heartbeat_error and monotonic.monotonic() - self._heartbeat_lastreceived > self._heartbeat_error > 0:
raise APIException('No heartbeat in %s seconds, aborting.' %
self._heartbeat_error)
elif monotonic.monotonic() - self._heartbeat_lastreceived > self._heartbeat_warning:
if self._heartbeat_timeout is False:
self._logger.warning('Link timeout, no heartbeat in last %s seconds' % self._heartbeat_warning)
self._heartbeat_timeout = True
sys.stdout.write(cur_line)
elif ord(ch) > 3:
cur_line += ch
sys.stdout.write(ch)
sys.stdout.flush()
data = mav_serialport.read(4096)
if data and len(data) > 0:
sys.stdout.write(data)
sys.stdout.flush()
# handle heartbeat sending
heartbeat_time = timer()
if heartbeat_time > next_heartbeat_time:
mav_serialport.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_GENERIC, 0, 0, 0)
next_heartbeat_time = heartbeat_time + 1
except serial.serialutil.SerialException as e:
print(e)
except KeyboardInterrupt:
mav_serialport.close()
finally:
termios.tcsetattr(fd_in, termios.TCSADRAIN, old_attr)
def mode(HEARTBEAT):
'''return flight mode number'''
from pymavlink import mavutil
if HEARTBEAT.type == mavutil.mavlink.MAV_TYPE_GCS:
return None
return HEARTBEAT.custom_mode
# NOTE: this returns a mavlink message
# this function should not be called outside of this class!
msg = self._master.recv_match(blocking=True, timeout=1)
if msg is None:
# no message received
return None
else:
if (msg.get_type() == 'BAD_DATA'):
# no message that is useful
return None
# send a heartbeat message back, since this needs to be constantly sent so the autopilot knows this exists
if msg.get_type() == 'HEARTBEAT':
# send -> type, autopilot, base mode, custom mode, system status
self._master.mav.heartbeat_send(
mavutil.mavlink.MAV_TYPE_GCS, mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0,
mavutil.mavlink.MAV_STATE_ACTIVE
)
# pass the message along to be handled by this class
return msg
# Log the first GPS location found, being sure to skip GPS fixes
# that are bad (at lat/lon of 0,0)
if first_gps_msg is None:
first_gps_msg = m
# Track the distance travelled, being sure to skip GPS fixes
# that are bad (at lat/lon of 0,0)
if last_gps_msg is None or m.time_usec > last_gps_msg.time_usec or m.time_usec+30e6 < last_gps_msg.time_usec:
if last_gps_msg is not None:
total_dist += distance_two(last_gps_msg, m)
# Save this GPS message to do simple distance calculations with
last_gps_msg = m
elif m.get_type() == 'HEARTBEAT':
if m.type == mavutil.mavlink.MAV_TYPE_GCS:
continue
if (m.base_mode & mavutil.mavlink.MAV_MODE_FLAG_GUIDED_ENABLED or
m.base_mode & mavutil.mavlink.MAV_MODE_FLAG_AUTO_ENABLED) and not autonomous:
autonomous = True
autonomous_sections += 1
start_auto_time = timestamp
elif (not m.base_mode & mavutil.mavlink.MAV_MODE_FLAG_GUIDED_ENABLED and
not m.base_mode & mavutil.mavlink.MAV_MODE_FLAG_AUTO_ENABLED) and autonomous:
autonomous = False
auto_time += timestamp - start_auto_time
# If there were no messages processed, say so
if start_time is None:
print("ERROR: No messages found.")
return
def send_heartbeat(master):
if master.mavlink10():
master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS, mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0, 0, 0)
else:
MAV_GROUND = 5
MAV_AUTOPILOT_NONE = 4
master.mav.heartbeat_send(MAV_GROUND, MAV_AUTOPILOT_NONE)
"""
last_msg_time = time.time()
async with websockets.connect(self._uri) as ws:
self._ws = ws
while self._running:
msg = await ws.recv()
msg = self.decode_message(msg)
if msg is None or msg.get_type() == 'BAD_DATA':
continue
# send a heartbeat message back, since this needs to be
# constantly sent so the autopilot knows this exists
if msg.get_type() == 'HEARTBEAT':
# send -> type, autopilot, base mode, custom mode, system status
outmsg = self._mav.heartbeat_encode(mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0,
mavutil.mavlink.MAV_STATE_ACTIVE)
await self.send_message(outmsg)
# print('Message received', msg)
current_time = time.time()
# print("Time between messages", current_time - last_msg_time)
# If we haven't heard a message in a given amount of time
# terminate connection and event loop.
if current_time - last_msg_time > self._timeout:
self.notify_message_listeners(MsgID.CONNECTION_CLOSED, 0)
self.stop()
# update the time of the last message
def mavlink_packet(self, m):
mtype = m.get_type()
if mtype == 'HEARTBEAT' and m.type != mavutil.mavlink.MAV_TYPE_GCS:
armed = self.master.motors_armed()
if armed != self.was_armed:
self.was_armed = armed
if armed and not self.all_checks_enabled():
self.say("Arming checks disabled")
ice_enable = self.get_mav_param('ICE_ENABLE', 0)
if ice_enable == 1:
rc = self.master.messages["RC_CHANNELS"]
v = self.mav_param.get('ICE_START_CHAN', None)
if v is None:
return
v = getattr(rc, 'chan%u_raw' % v)
if v <= 1300:
self.say("ICE Disabled")