Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def lazrs_compress_points(points_data, parallel=True):
try:
import lazrs
except Exception as e:
raise LazError("lazrs is not installed") from e
try:
vlr = lazrs.LazVlr.new_for_compression(
points_data.point_format.id, points_data.point_format.num_extra_bytes)
compressed_data = lazrs.compress_points(
vlr,
np.frombuffer(points_data.array, np.uint8),
parallel
)
except lazrs.LazrsError as e:
raise LazError("lazrs error: {}".format(e)) from e
else:
return compressed_data, vlr.record_data()
def __init__(self, dest: BinaryIO, point_format: PointFormat, parallel: bool):
self.dest = dest
self.vlr = lazrs.LazVlr.new_for_compression(point_format.id, point_format.num_extra_bytes)
self.parallel = parallel
self.compressor = None
def write_initial_header_and_vlrs(self, header, vlrs: VLRList):
laszip_vlr = LasZipVlr(self.vlr.record_data())
vlrs.append(laszip_vlr)
super().write_initial_header_and_vlrs(header, vlrs)
# We have to initialize our compressor here
# because on init, it writes the offset to chunk table
# so the header and vlrs have to be written
if self.parallel:
self.compressor = lazrs.ParLasZipCompressor(self.dest, self.vlr)
else:
self.compressor = lazrs.LasZipCompressor(self.dest, self.vlr)
def write_initial_header_and_vlrs(self, header, vlrs: VLRList):
laszip_vlr = LasZipVlr(self.vlr.record_data())
vlrs.append(laszip_vlr)
super().write_initial_header_and_vlrs(header, vlrs)
# We have to initialize our compressor here
# because on init, it writes the offset to chunk table
# so the header and vlrs have to be written
if self.parallel:
self.compressor = lazrs.ParLasZipCompressor(self.dest, self.vlr)
else:
self.compressor = lazrs.LasZipCompressor(self.dest, self.vlr)
def lazrs_compress_points(points_data, parallel=True):
try:
import lazrs
except Exception as e:
raise LazError("lazrs is not installed") from e
try:
vlr = lazrs.LazVlr.new_for_compression(
points_data.point_format.id, points_data.point_format.num_extra_bytes)
compressed_data = lazrs.compress_points(
vlr,
np.frombuffer(points_data.array, np.uint8),
parallel
)
except lazrs.LazrsError as e:
raise LazError("lazrs error: {}".format(e)) from e
else:
return compressed_data, vlr.record_data()
def lazrs_decompress_buffer(compressed_buffer, point_size, point_count, laszip_vlr, parallel=True):
try:
import lazrs
except Exception as e:
raise LazError("lazrs is not installed") from e
try:
point_compressed = np.frombuffer(compressed_buffer, dtype=np.uint8)
vlr_data = np.frombuffer(laszip_vlr.record_data, dtype=np.uint8)
point_decompressed = np.zeros(point_count * point_size, np.uint8)
lazrs.decompress_points(point_compressed, vlr_data, point_decompressed, parallel)
except lazrs.LazrsError as e:
raise LazError("lazrs error: {}".format(e)) from e
else:
return point_decompressed
def lazrs_decompress_buffer(compressed_buffer, point_size, point_count, laszip_vlr, parallel=True):
try:
import lazrs
except Exception as e:
raise LazError("lazrs is not installed") from e
try:
point_compressed = np.frombuffer(compressed_buffer, dtype=np.uint8)
vlr_data = np.frombuffer(laszip_vlr.record_data, dtype=np.uint8)
point_decompressed = np.zeros(point_count * point_size, np.uint8)
lazrs.decompress_points(point_compressed, vlr_data, point_decompressed, parallel)
except lazrs.LazrsError as e:
raise LazError("lazrs error: {}".format(e)) from e
else:
return point_decompressed