Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, data, header, is_info_multiple=False):
if is_info_multiple: # INFO_MULTIPLE message
self.is_continued, = struct.unpack('
def __init__(self, data, header, is_info_multiple=False):
if is_info_multiple: # INFO_MULTIPLE message
self.is_continued, = struct.unpack('
def __init__(self, data, header):
format_arr = ULog.parse_string(data).split(':')
self.name = format_arr[0]
types_str = format_arr[1].split(';')
self.fields = [] # list of tuples (type, array_size, name)
for t in types_str:
if len(t) > 0:
self.fields.append(self._extract_type(t))
if not os.path.isdir(arg):
parser.error('The directory {} does not exist'.format(arg))
# File exists so return the directory
return arg
parser.add_argument('-o', '--output', dest='output', action='store',
help='Output directory (default is CWD)',
metavar='DIR', type=lambda x: is_valid_directory(parser, x))
parser.add_argument('-i', '--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
msg_filter = ['gps_dump']
ulog = ULog(ulog_file_name, msg_filter, disable_str_exceptions)
data = ulog.data_list
output_file_prefix = os.path.basename(ulog_file_name)
# strip '.ulg'
if output_file_prefix.lower().endswith('.ulg'):
output_file_prefix = output_file_prefix[:-4]
# write to different output path?
if args.output is not None:
output_file_prefix = os.path.join(args.output, output_file_prefix)
to_dev_filename = output_file_prefix+'_to_device.dat'
from_dev_filename = output_file_prefix+'_from_device.dat'
if len(data) == 0:
parser.add_argument('output_filename', metavar='params.txt',
type=argparse.FileType('w'), nargs='?',
help='Output filename (default=stdout)', default=sys.stdout)
parser.add_argument('--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
message_filter = []
if not args.initial: message_filter = None
ulog = ULog(ulog_file_name, message_filter, disable_str_exceptions)
param_keys = sorted(ulog.initial_parameters.keys())
delimiter = ','
output_file = args.output_filename
if args.format == "csv":
for param_key in param_keys:
output_file.write(param_key)
if args.timestamps:
output_file.write(delimiter)
output_file.write(str(ulog.initial_parameters[param_key]))
for t, name, value in ulog.changed_parameters:
if name == param_key:
output_file.write(delimiter)
output_file.write(str(value))
def _parse_nested_type(self, prefix_str, type_name, message_formats):
# we flatten nested types
message_format = message_formats[type_name]
for (type_name_fmt, array_size, field_name) in message_format.fields:
if type_name_fmt in ULog._UNPACK_TYPES:
if array_size > 1:
for i in range(array_size):
self.field_data.append(ULog._FieldData(
prefix_str+field_name+'['+str(i)+']', type_name_fmt))
else:
self.field_data.append(ULog._FieldData(
prefix_str+field_name, type_name_fmt))
if prefix_str+field_name == 'timestamp':
self.timestamp_idx = len(self.field_data) - 1
else: # nested type
if array_size > 1:
for i in range(array_size):
self._parse_nested_type(prefix_str+field_name+'['+str(i)+'].',
type_name_fmt, message_formats)
else:
self._parse_nested_type(prefix_str+field_name+'.',
type_name_fmt, message_formats)
if self._has_sync:
self._find_sync(header.msg_size)
except IndexError:
if not self._file_corrupt:
print("File corruption detected while reading file data!")
self._file_corrupt = True
except struct.error:
pass #we read past the end of the file
# convert into final representation
while self._subscriptions:
_, value = self._subscriptions.popitem()
if len(value.buffer) > 0: # only add if we have data
data_item = ULog.Data(value)
self._data_list.append(data_item)
def initialize(self, data):
self.msg_size, self.msg_type = ULog._unpack_ushort_byte(data)
used_style = default_style
if style is not None:
for key in style:
used_style[key] = style[key]
if not isinstance(position_topic_name, list):
position_topic_name = [position_topic_name]
colors = [colors]
kml = simplekml.Kml()
load_topic_names = position_topic_name + ['vehicle_status']
if camera_trigger_topic_name is not None:
load_topic_names.append(camera_trigger_topic_name)
ulog = ULog(ulog_file_name, load_topic_names, disable_str_exceptions)
# get flight modes
try:
cur_dataset = ulog.get_dataset('vehicle_status')
flight_mode_changes = cur_dataset.list_value_changes('nav_state')
flight_mode_changes.append((ulog.last_timestamp, -1))
except (KeyError, IndexError) as error:
flight_mode_changes = []
# add the graphs
for topic, cur_colors in zip(position_topic_name, colors):
_kml_add_position_data(kml, ulog, topic, cur_colors, used_style,
altitude_offset, minimum_interval_s, flight_mode_changes)
# camera triggers
_kml_add_camera_triggers(kml, ulog, camera_trigger_topic_name, altitude_offset)
parser = argparse.ArgumentParser(description='Display information from an ULog file')
parser.add_argument('filename', metavar='file.ulg', help='ULog input file')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='Verbose output', default=False)
parser.add_argument('-m', '--message', dest='message',
help='Show a specific Info Multiple Message')
parser.add_argument('-n', '--newline', dest='newline', action='store_true',
help='Add newline separators (only with --message)', default=False)
parser.add_argument('-i', '--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
ulog = ULog(ulog_file_name, None, disable_str_exceptions)
message = args.message
if message:
separator = ""
if args.newline: separator = "\n"
if len(ulog.msg_info_multiple_dict) > 0 and message in ulog.msg_info_multiple_dict:
message_info_multiple = ulog.msg_info_multiple_dict[message]
for i, m in enumerate(message_info_multiple):
print("# {} {}:".format(message, i))
print(separator.join(m))
else:
print("message {} not found".format(message))
else:
show_info(ulog, args.verbose)