Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if __name__ == "__main__":
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug)
path = "[path_to_reconstruction_system_output]"
out_path = "[path_to_sampled_frames_are_located]"
make_clean_folder(out_path)
make_clean_folder(os.path.join(out_path, "depth/"))
make_clean_folder(os.path.join(out_path, "image/"))
make_clean_folder(os.path.join(out_path, "scene/"))
sampling_rate = 30
depth_image_path = get_file_list(os.path.join(path, "depth/"),
extension=".png")
color_image_path = get_file_list(os.path.join(path, "image/"),
extension=".jpg")
pose_graph_global = o3d.io.read_pose_graph(
os.path.join(path, template_global_posegraph_optimized))
n_fragments = len(depth_image_path) // n_frames_per_fragment + 1
pose_graph_fragments = []
for i in range(n_fragments):
pose_graph_fragment = o3d.io.read_pose_graph(
os.path.join(path, template_fragment_posegraph_optimized % i))
pose_graph_fragments.append(pose_graph_fragment)
depth_image_path_new = []
color_image_path_new = []
traj = []
cnt = 0
for i in range(len(depth_image_path)):
if i % sampling_rate == 0:
metadata = [cnt, cnt, len(depth_image_path) // sampling_rate + 1]
print(metadata)
def run_posegraph_optimization(pose_graph_name, pose_graph_optimized_name,
max_correspondence_distance,
preference_loop_closure):
# to display messages from o3d.registration.global_optimization
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug)
method = o3d.registration.GlobalOptimizationLevenbergMarquardt()
criteria = o3d.registration.GlobalOptimizationConvergenceCriteria()
option = o3d.registration.GlobalOptimizationOption(
max_correspondence_distance=max_correspondence_distance,
edge_prune_threshold=0.25,
preference_loop_closure=preference_loop_closure,
reference_node=0)
pose_graph = o3d.io.read_pose_graph(pose_graph_name)
o3d.registration.global_optimization(pose_graph, method, criteria, option)
o3d.io.write_pose_graph(pose_graph_optimized_name, pose_graph)
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Error)
if (args.path_intrinsic):
pinhole_camera_intrinsic = o3d.io.read_pinhole_camera_intrinsic(
args.path_intrinsic)
else:
pinhole_camera_intrinsic = \
o3d.camera.PinholeCameraIntrinsic(
o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault)
pcds = []
[color_files, depth_files] = \
get_rgbd_file_lists(config["path_dataset"])
n_files = len(color_files)
n_fragments = int(math.ceil(float(n_files) / \
config['n_frames_per_fragment']))
sid = int(args.fragment) * config['n_frames_per_fragment']
eid = min(sid + config['n_frames_per_fragment'], n_files)
pose_graph = o3d.io.read_pose_graph(join(config["path_dataset"],
config["template_fragment_posegraph_optimized"] % \
int(args.fragment)))
for i in range(sid, eid):
print("appending rgbd image %d" % i)
rgbd_image = read_rgbd_image(color_files[i], depth_files[i],
False, config)
pcd_i = o3d.geometry.PointCloud.create_from_rgbd_image(
rgbd_image, pinhole_camera_intrinsic,
np.linalg.inv(pose_graph.nodes[i - sid].pose))
pcd_i_down = pcd_i.voxel_down_sample(config["voxel_size"])
pcds.append(pcd_i_down)
draw_geometries_flip(pcds)
def scalable_integrate_rgb_frames(path_dataset, intrinsic, config):
poses = []
[color_files, depth_files] = get_rgbd_file_lists(path_dataset)
n_files = len(color_files)
n_fragments = int(math.ceil(float(n_files) / \
config['n_frames_per_fragment']))
volume = o3d.integration.ScalableTSDFVolume(
voxel_length=config["tsdf_cubic_size"] / 512.0,
sdf_trunc=0.04,
color_type=o3d.integration.TSDFVolumeColorType.RGB8)
pose_graph_fragment = o3d.io.read_pose_graph(
join(path_dataset, config["template_refined_posegraph_optimized"]))
for fragment_id in range(len(pose_graph_fragment.nodes)):
pose_graph_rgbd = o3d.io.read_pose_graph(
join(path_dataset,
config["template_fragment_posegraph_optimized"] % fragment_id))
for frame_id in range(len(pose_graph_rgbd.nodes)):
frame_id_abs = fragment_id * \
config['n_frames_per_fragment'] + frame_id
print(
"Fragment %03d / %03d :: integrate rgbd frame %d (%d of %d)." %
(fragment_id, n_fragments - 1, frame_id_abs, frame_id + 1,
len(pose_graph_rgbd.nodes)))
rgbd = read_rgbd_image(color_files[frame_id_abs],
depth_files[frame_id_abs], False, config)
pose = np.dot(pose_graph_fragment.nodes[fragment_id].pose,
pose_graph_rgbd.nodes[frame_id].pose)
volume.integrate(rgbd, intrinsic, np.linalg.inv(pose))
poses.append(pose)
def compute_initial_registration(s, t, source_down, target_down, source_fpfh,
target_fpfh, path_dataset, config):
if t == s + 1: # odometry case
print("Using RGBD odometry")
pose_graph_frag = o3d.io.read_pose_graph(
join(path_dataset,
config["template_fragment_posegraph_optimized"] % s))
n_nodes = len(pose_graph_frag.nodes)
transformation_init = np.linalg.inv(pose_graph_frag.nodes[n_nodes -
1].pose)
(transformation, information) = \
multiscale_icp(source_down, target_down,
[config["voxel_size"]], [50], config, transformation_init)
else: # loop closure case
(success, transformation,
information) = register_point_cloud_fpfh(source_down, target_down,
source_fpfh, target_fpfh,
config)
if not success:
print("No resonable solution. Skip this pair")
return (False, np.identity(4), np.zeros((6, 6)))
def make_posegraph_for_refined_scene(ply_file_names, config):
pose_graph = o3d.io.read_pose_graph(
join(config["path_dataset"],
config["template_global_posegraph_optimized"]))
n_files = len(ply_file_names)
matching_results = {}
for edge in pose_graph.edges:
s = edge.source_node_id
t = edge.target_node_id
matching_results[s * n_files + t] = \
matching_result(s, t, edge.transformation)
if config["python_multi_threading"]:
from joblib import Parallel, delayed
import multiprocessing
import subprocess
MAX_THREAD = min(multiprocessing.cpu_count(),
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug)
print("")
print("Parameters for o3d.registration.PoseGraph optimization ...")
method = o3d.registration.GlobalOptimizationLevenbergMarquardt()
criteria = o3d.registration.GlobalOptimizationConvergenceCriteria()
option = o3d.registration.GlobalOptimizationOption()
print("")
print(method)
print(criteria)
print(option)
print("")
print("Optimizing Fragment o3d.registration.PoseGraph using open3d ...")
data_path = "../../TestData/GraphOptimization/"
pose_graph_fragment = o3d.io.read_pose_graph(
data_path + "pose_graph_example_fragment.json")
print(pose_graph_fragment)
o3d.registration.global_optimization(pose_graph_fragment, method, criteria,
option)
o3d.io.write_pose_graph(
data_path + "pose_graph_example_fragment_optimized.json",
pose_graph_fragment)
print("")
print("Optimizing Global o3d.registration.PoseGraph using open3d ...")
pose_graph_global = o3d.io.read_pose_graph(data_path +
"pose_graph_example_global.json")
print(pose_graph_global)
o3d.registration.global_optimization(pose_graph_global, method, criteria,
option)
o3d.io.write_pose_graph(
print("")
print("Optimizing Fragment o3d.registration.PoseGraph using open3d ...")
data_path = "../../TestData/GraphOptimization/"
pose_graph_fragment = o3d.io.read_pose_graph(
data_path + "pose_graph_example_fragment.json")
print(pose_graph_fragment)
o3d.registration.global_optimization(pose_graph_fragment, method, criteria,
option)
o3d.io.write_pose_graph(
data_path + "pose_graph_example_fragment_optimized.json",
pose_graph_fragment)
print("")
print("Optimizing Global o3d.registration.PoseGraph using open3d ...")
pose_graph_global = o3d.io.read_pose_graph(data_path +
"pose_graph_example_global.json")
print(pose_graph_global)
o3d.registration.global_optimization(pose_graph_global, method, criteria,
option)
o3d.io.write_pose_graph(
data_path + "pose_graph_example_global_optimized.json",
pose_graph_global)
print("")
def integrate_rgb_frames_for_fragment(color_files, depth_files, fragment_id,
n_fragments, pose_graph_name, intrinsic,
config):
pose_graph = o3d.io.read_pose_graph(pose_graph_name)
volume = o3d.integration.ScalableTSDFVolume(
voxel_length=config["tsdf_cubic_size"] / 512.0,
sdf_trunc=0.04,
color_type=o3d.integration.TSDFVolumeColorType.RGB8)
for i in range(len(pose_graph.nodes)):
i_abs = fragment_id * config['n_frames_per_fragment'] + i
print(
"Fragment %03d / %03d :: integrate rgbd frame %d (%d of %d)." %
(fragment_id, n_fragments - 1, i_abs, i + 1, len(pose_graph.nodes)))
rgbd = read_rgbd_image(color_files[i_abs], depth_files[i_abs], False,
config)
pose = pose_graph.nodes[i].pose
volume.integrate(rgbd, intrinsic, np.linalg.inv(pose))
mesh = volume.extract_triangle_mesh()
mesh.compute_vertex_normals()
return mesh