Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not os.path.isdir(arg):
parser.error('The directory {} does not exist'.format(arg))
# File exists so return the directory
return arg
parser.add_argument('-o', '--output', dest='output', action='store',
help='Output directory (default is CWD)',
metavar='DIR', type=lambda x: is_valid_directory(parser, x))
parser.add_argument('-i', '--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
msg_filter = ['gps_dump']
ulog = ULog(ulog_file_name, msg_filter, disable_str_exceptions)
data = ulog.data_list
output_file_prefix = os.path.basename(ulog_file_name)
# strip '.ulg'
if output_file_prefix.lower().endswith('.ulg'):
output_file_prefix = output_file_prefix[:-4]
# write to different output path?
if args.output is not None:
output_file_prefix = os.path.join(args.output, output_file_prefix)
to_dev_filename = output_file_prefix+'_to_device.dat'
from_dev_filename = output_file_prefix+'_from_device.dat'
if len(data) == 0:
parser.add_argument('output_filename', metavar='params.txt',
type=argparse.FileType('w'), nargs='?',
help='Output filename (default=stdout)', default=sys.stdout)
parser.add_argument('--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
message_filter = []
if not args.initial: message_filter = None
ulog = ULog(ulog_file_name, message_filter, disable_str_exceptions)
param_keys = sorted(ulog.initial_parameters.keys())
delimiter = ','
output_file = args.output_filename
if args.format == "csv":
for param_key in param_keys:
output_file.write(param_key)
if args.timestamps:
output_file.write(delimiter)
output_file.write(str(ulog.initial_parameters[param_key]))
for t, name, value in ulog.changed_parameters:
if name == param_key:
output_file.write(delimiter)
output_file.write(str(value))
def initialize(self, data):
self.msg_size, self.msg_type = ULog._unpack_ushort_byte(data)
used_style = default_style
if style is not None:
for key in style:
used_style[key] = style[key]
if not isinstance(position_topic_name, list):
position_topic_name = [position_topic_name]
colors = [colors]
kml = simplekml.Kml()
load_topic_names = position_topic_name + ['vehicle_status']
if camera_trigger_topic_name is not None:
load_topic_names.append(camera_trigger_topic_name)
ulog = ULog(ulog_file_name, load_topic_names, disable_str_exceptions)
# get flight modes
try:
cur_dataset = ulog.get_dataset('vehicle_status')
flight_mode_changes = cur_dataset.list_value_changes('nav_state')
flight_mode_changes.append((ulog.last_timestamp, -1))
except (KeyError, IndexError) as error:
flight_mode_changes = []
# add the graphs
for topic, cur_colors in zip(position_topic_name, colors):
_kml_add_position_data(kml, ulog, topic, cur_colors, used_style,
altitude_offset, minimum_interval_s, flight_mode_changes)
# camera triggers
_kml_add_camera_triggers(kml, ulog, camera_trigger_topic_name, altitude_offset)
parser = argparse.ArgumentParser(description='Display information from an ULog file')
parser.add_argument('filename', metavar='file.ulg', help='ULog input file')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='Verbose output', default=False)
parser.add_argument('-m', '--message', dest='message',
help='Show a specific Info Multiple Message')
parser.add_argument('-n', '--newline', dest='newline', action='store_true',
help='Add newline separators (only with --message)', default=False)
parser.add_argument('-i', '--ignore', dest='ignore', action='store_true',
help='Ignore string parsing exceptions', default=False)
args = parser.parse_args()
ulog_file_name = args.filename
disable_str_exceptions = args.ignore
ulog = ULog(ulog_file_name, None, disable_str_exceptions)
message = args.message
if message:
separator = ""
if args.newline: separator = "\n"
if len(ulog.msg_info_multiple_dict) > 0 and message in ulog.msg_info_multiple_dict:
message_info_multiple = ulog.msg_info_multiple_dict[message]
for i, m in enumerate(message_info_multiple):
print("# {} {}:".format(message, i))
print(separator.join(m))
else:
print("message {} not found".format(message))
else:
show_info(ulog, args.verbose)
def load_data(self):
log_data = ULog(str(self.log_file_name))
self.log_info_data = {index:value for index,value in log_data.msg_info_dict.items() if 'perf_' not in index}
self.log_info_data['SW version'] = log_data.get_version_info_str()
self.log_params_data = log_data.initial_parameters
self.log_params_data = OrderedDict([(key, self.log_params_data[key]) for key in sorted(self.log_params_data)])
self.log_data_list = log_data.data_list
self.data_dict = OrderedDict()
for d in self.log_data_list:
data_items_list = [f.field_name for f in d.field_data]
data_items_list.remove('timestamp')
data_items_list.insert(0, 'timestamp')
data_items = [(item, str(d.data[item].dtype), str(len(d.data[item]))) for item in data_items_list]
# add suffix to distinguish same name
i = 0
name = d.name
while True:
if i > 0: