Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cdf_create(fn: Path, spec: dict):
# str(fn) is a Python==3.5 workaround
return cdfwrite.CDF(str(fn), cdf_spec=spec)
tailVXR = 0
flags = 0
if recVary:
flags = CDF._set_bit(flags, 0)
flags = CDF._set_bit(flags, 1)
sRecords = sparse
rfuB = 0
rfuC = -1
rfuF = -1
if zVar:
num = len(self.zvars)
else:
num = len(self.rvars)
if compression > 0:
offsetCPRorSPR = self._write_cpr(f, CDF.GZIP_COMPRESSION,
compression)
flags = CDF._set_bit(flags, 2)
else:
offsetCPRorSPR = -1
if blockingfactor is None:
blockingFactor = 1
else:
blockingFactor = blockingfactor
# Increase the block size to account for "zDimSizes" and "DimVarys" fields
if numDims > 0:
if zVar:
block_size = block_size + numDims * 8
else:
block_size = block_size + numDims * 4
compression : int
The level of compression between 0-9
pad : num
The pad values to insert
zVar : bool
True if this variable is a z variable
Returns:
num : int
The number of the variable
byte_loc : int
The current byte location within the file
'''
if zVar:
block_size = CDF.zVDR_BASE_SIZE64
section_type = CDF.zVDR_
else:
block_size = CDF.rVDR_BASE_SIZE64
section_type = CDF.rVDR_
nextVDR = 0
dataType = cdataType
if dataType == -1:
raise ValueError('Bad data type.')
maxRec = -1
headVXR = 0
tailVXR = 0
flags = 0
if recVary:
flags = CDF._set_bit(flags, 0)
tofill = 256 - len(name)
vdr[84:340] = (name+'\0'*tofill).encode()
if zVar:
vdr[340:344] = struct.pack('>i', numDims)
if (numDims > 0):
for i in range(0, numDims):
vdr[344+i*4:344+(i+1)*4] = struct.pack('>i', dimSizes[i])
ist = 344+numDims*4
for i in range(0, numDims):
vdr[ist+i*4:ist+(i+1)*4] = struct.pack('>i', CDF.VARY)
ist = 344 + 8 * numDims
else:
if (numDims > 0):
for i in range(0, numDims):
if (dimVary[i] == True or dimVary[i] != 0):
vdr[340+i*4:344+i*4] = struct.pack('>i', CDF.VARY)
else:
vdr[340+i*4:344+i*4] = struct.pack('>i', CDF.NOVARY)
ist = 340 + 4 * numDims
vdr[ist:block_size] = pad
f.write(vdr)
# Set variable info
info = []
info.append(name)
info.append(byte_loc)
if zVar:
info.append(numDims)
info.append(dimSizes)
else:
info.append(self.num_rdim)
info.append(self.rdim_sizes)
form = tofrom + str(size) + dt_string
return recs, struct.pack(form, *indata)
elif (isinstance(indata, bytes)):
tofrom = self._convert_option()
recs = int(len(indata) / recSize)
dt_string = CDF._convert_type(data_type)
size = recs * num_values * num_elems
if (data_type == CDF.CDF_EPOCH16):
size = size * 2
form = str(size) + dt_string
form2 = tofrom + form
datau = struct.unpack(form, indata)
return recs, struct.pack(form2, *datau)
elif (isinstance(indata, np.ndarray)):
tofrom = self._convert_option()
npdata = CDF._convert_nptype(data_type, indata)
if indata.size == num_values: # Check if only one record is being read in
recs = 1
else:
recs = len(indata)
dt_string = CDF._convert_type(data_type)
if (data_type == CDF.CDF_EPOCH16):
num_elems = 2 * num_elems
form = str(recs*num_values*num_elems) + dt_string
form2 = tofrom + str(recs*num_values*num_elems) + dt_string
datau = struct.unpack(form, npdata)
return recs, struct.pack(form2, *datau)
elif (isinstance(indata, str)):
return 1, indata.ljust(num_elems, '\x00').encode()
else:
tofrom = self._convert_option()
dt_string = CDF._convert_type(data_type)
The total number of VXRs
Returns:
newVXRhead : int
The byte location of the newest VXR head
newvxroff : int
The byte location of the last VXR head
'''
newNumVXRs = int(numVXRs / CDF.NUM_VXRlvl_ENTRIES)
remaining = int(numVXRs % CDF.NUM_VXRlvl_ENTRIES)
vxroff = vxrhead
prevxroff = -1
if (remaining != 0):
newNumVXRs += 1
CDF.level += 1
for x in range(0, newNumVXRs):
newvxroff = self._write_vxr(f, numEntries=CDF.NUM_VXRlvl_ENTRIES)
if (x > 0):
self._update_offset_value(f, prevxroff+12, 8, newvxroff)
else:
newvxrhead = newvxroff
prevxroff = newvxroff
if (x == (newNumVXRs - 1)):
if (remaining == 0):
endEntry = CDF.NUM_VXRlvl_ENTRIES
else:
endEntry = remaining
else:
endEntry = CDF.NUM_VXRlvl_ENTRIES
for _ in range(0, endEntry):
recFirst, recLast = self._get_recrange(f, vxroff)
# Dictionary object, contains name, offset, and scope (global or variable)
self.attrsinfo = {}
self.gattrs = [] # List of global attributes
self.vattrs = [] # List of variable attributes
self.attrs = [] # List of ALL attributes
self.zvars = [] # List of z variable names
self.rvars = [] # List of r variable names
self.checksum = checksum # Boolean, whether or not to include the checksum at the end
self.compression = cdf_compression # Compression level (or True/False)
self.num_rdim = num_rdim # Number of r dimensions
self.rdim_sizes = rdim_sizes # Size of r dimensions
self.majority = major
with path.open('wb') as f:
f.write(binascii.unhexlify(CDF.V3magicNUMBER_1))
f.write(binascii.unhexlify(CDF.V3magicNUMBER_2))
self.cdr_head = self._write_cdr(f, major, self._encoding, checksum)
self.gdr_head = self._write_gdr(f)
self.offset = f.tell()
Parameters:
f : file
Uncompressed file to read from
g : file
File to read the compressed file into
level : int
The level of the compression from 0 to 9
Returns: None
'''
f.seek(8)
data = f.read()
uSize = len(data)
section_type = CDF.CCR_
rfuA = 0
cData = gzip.compress(data, level)
block_size = CDF.CCR_BASE_SIZE64 + len(cData)
cprOffset = 0
ccr1 = bytearray(32)
# ccr1[0:4] = binascii.unhexlify(CDF.V3magicNUMBER_1)
# ccr1[4:8] = binascii.unhexlify(CDF.V3magicNUMBER_2c)
ccr1[0:8] = struct.pack('>q', block_size)
ccr1[8:12] = struct.pack('>i', section_type)
ccr1[12:20] = struct.pack('>q', cprOffset)
ccr1[20:28] = struct.pack('>q', uSize)
ccr1[28:32] = struct.pack('>i', rfuA)
g.seek(0, 2)
g.write(ccr1)
g.write(cData)
cprOffset = self._write_cpr(g, CDF.GZIP_COMPRESSION, level)
def _datatype_define(value):
if (isinstance(value, str)):
return len(value), CDF.CDF_CHAR
else:
numElems = 1
if (isinstance(value, int)):
return numElems, CDF.CDF_INT8
elif (isinstance(value, float)):
return numElems, CDF.CDF_DOUBLE
elif (isinstance(value, complex)):
return numElems, CDF.CDF_EPOCH16
else:
warnings.warn('Invalid data type for data.... Skip')
return None, None