Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup_write(self,header, vlrs, evlrs):
self._header_current = False
if header == False:
raise laspy.util.LaspyException("Write mode requires a valid header object.")
## No file to store data yet.
self.has_point_records = False
self.data_provider.open("w+b")
self.header_format = header.format
self._header = header
self.header = laspy.header.HeaderManager(header = header, reader = self)
self.initialize_file_padding(vlrs)
## We have a file to store data now.
self.data_provider.remap()
self.header.flush()
self.correct_rec_len()
if not vlrs in [[], False]:
self.set_vlrs(vlrs)
else:
def get_start_wavefm_data_record(self):
if not self.version in ("1.3", "1.4"):
raise util.LaspyException("Waveform data not present in version: " + self.version)
return(self.reader.get_header_property("start_wavefm_data_rec"))
def set_overlap(self, overlap):
'''Set the binary field withheld inside the raw classification byte'''
if self.header.data_format_id in (6,7,8,9,10):
class_byte = self.get_raw_classification_flags()
self.raise_if_overflow(overlap, 1)
outbyte = self.bitpack((class_byte, overlap, class_byte), ((0,3),(0,1), (4,8)))
self.set_raw_classification_flags(outbyte)
else:
raise laspy.util.LaspyException("Overlap only present in point formats > 5.")
return
def verify_num_vlrs(self):
headervlrs = self.get_header_property("num_variable_len_recs")
calc_headervlrs = len(self.vlrs)
if headervlrs != calc_headervlrs:
raise laspy.util.LaspyException('''Number of EVLRs provided does not match the number
specified in the header. (copied headers do not maintain
references to their EVLRs, that might be your problem.
You can pass them explicitly to the File constructor.)''')
if self.header.version == "1.4":
calc_headerevlrs = len(self.evlrs)
headerevlrs = self.get_header_property("num_evlrs")
if headerevlrs != calc_headerevlrs:
raise laspy.util.LaspyException('''Number of EVLRs provided does not match the number
specified in the header. (copied headers do not maintain
def map(self):
'''Memory map the file'''
if self.fileref == False and not self.compressed:
raise laspy.util.LaspyException("File not opened.")
if self.mode in ("r", "r-"):
if self.compressed and self.mode != "r-" and not HAVE_LAZPERF:
self._mmap = FakeMmap(self.filename)
else:
self._mmap = mmap.mmap(self.fileref.fileno(), 0, access = mmap.ACCESS_READ)
elif self.mode in ("w", "rw"):
self._mmap = mmap.mmap(self.fileref.fileno(), 0, access = mmap.ACCESS_WRITE)
else:
raise laspy.util.LaspyException("Invalid Mode: " + str(self.mode))
def raise_if_overflow(self, arr, maximum_packed_length):
if np.any(np.array(arr) >= 2**maximum_packed_length):
raise laspy.util.LaspyException("Invalid Data: Packed Length is Greater than allowed.")
self.seek(0, rel=False)
dat_part_1 = self.data_provider._mmap.read(self.vlr_stop)
self.seek(old_offset, rel = False)
dat_part_2 = self.data_provider._mmap.read(len(self.data_provider._mmap) - old_offset)
self.data_provider.close()
self.data_provider.open("w+b")
self.data_provider.fileref.write(dat_part_1)
self.data_provider.fileref.write(b"\x00"*value)
self.data_provider.fileref.write(dat_part_2)
self.data_provider.close()
self.__init__(self.data_provider.filename, self.mode)
return(len(self.data_provider._mmap))
elif self.mode == "r+":
pass
else:
raise(laspy.util.LaspyException("Must be in write mode to change padding."))
return(len(self.data_provider._mmap))
def set_vlrs(self, value):
if value == False or len(value) == 0:
return
if not all([x.isVLR for x in value]):
raise laspy.util.LaspyException("set_vlrs requers an iterable object " +
"composed of :obj:`laspy.header.VLR` objects.")
elif self.mode == "w+":
raise NotImplementedError
elif self.mode == "rw":
current_size = self.data_provider._mmap.size()
current_padding = self.get_padding()
old_offset = self.header.data_offset
new_offset = current_padding + self.header.header_size + sum([len(x) for x in value])
self.set_header_property("data_offset", new_offset)
self.set_header_property("num_variable_len_recs", len(value))
self.data_provider.fileref.seek(0, 0)
dat_part_1 = self.data_provider.fileref.read(self.header.header_size)
self.data_provider.fileref.seek(old_offset, 0)
dat_part_2 = self.data_provider.fileref.read(current_size - old_offset)
# Manually Close:
self.data_provider.close(flush=False)
def populate_vlrs(self):
'''Catalogue the variable length records'''
self.vlrs = []
self.seek(self.header.header_size, rel = False)
for i in xrange(self.get_header_property("num_variable_len_recs")):
new_vlr = laspy.header.VLR(None, None, None)
new_vlr.build_from_reader(self)
self.vlrs.append(new_vlr)
if self.data_provider._mmap.tell() > self.header.data_offset:
self.seek(self.header.data_offset, rel = False)
raise laspy.util.LaspyException("Error, Calculated Header Data "
"Overlaps The Point Records!")
self.vlr_stop = self.data_provider._mmap.tell()
return
def to_byte_string(self):
'''Pack the entire EVLR into a byte string.'''
if type(self.parsed_body) != type(None):
self.pack_data()
out = (self.pack("reserved", self.reserved) +
self.pack("user_id", self.user_id) +
self.pack("record_id", self.record_id) +
self.pack("rec_len_after_header", self.rec_len_after_header) +
self.pack("description", self.description) +
self.VLR_body)
diff = (self.rec_len_after_header - len(self.VLR_body))
if diff > 0:
out += b"\x00"*diff
elif diff < 0:
raise util.LaspyException("Invalid Data in EVLR: too long for specified rec_len." +
" rec_len_after_header = " + str(self.rec_len_after_header) +
" actual length = " + str(len(self.VLR_body)))
return(out)