Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
array_extension=SerializableArray):
if not issubclass(array_extension, SerializableArray):
raise TypeError('array_extension must be a subclass of SerializableArray.')
self.child_type = child_type
tags = tag_dict[name]
self.array = tags.get('array', False)
if not self.array:
raise ValueError(
'Attribute {} is populated in the `_collection_tags` dictionary without `array`=True. '
'This is inconsistent with using _SerializableArrayDescriptor.'.format(name))
self.child_tag = tags['child_tag']
self._typ_string = 'numpy.ndarray[{}]:'.format(str(child_type).strip().split('.')[-1][:-2])
self.array_extension = array_extension
self.minimum_length = self._DEFAULT_MIN_LENGTH if minimum_length is None else int_func(minimum_length)
self.maximum_length = self._DEFAULT_MAX_LENGTH if maximum_length is None else int_func(maximum_length)
if self.minimum_length > self.maximum_length:
raise ValueError(
'Specified minimum length is {}, while specified maximum length is {}'.format(
self.minimum_length, self.maximum_length))
super(_SerializableArrayDescriptor, self).__init__(name, required, strict=strict, docstring=docstring)
def from_node(cls, node, xml_ns, ns_key=None, kwargs=None):
order1 = int_func(node.attrib['order1'])
order2 = int_func(node.attrib['order2'])
coefs = numpy.zeros((order1+1, order2+1), dtype=numpy.float64)
coef_key = cls._child_xml_ns_key.get('Coefs', ns_key)
coef_nodes = _find_children(node, 'Coef', xml_ns, coef_key)
for cnode in coef_nodes:
ind1 = int_func(cnode.attrib['exponent1'])
ind2 = int_func(cnode.attrib['exponent2'])
val = float(_get_node_value(cnode))
coefs[ind1, ind2] = val
return cls(Coefs=coefs)
def from_node(cls, node, xml_ns, ns_key=None, kwargs=None):
dim1 = int_func(node.attrib['size'])
dim2 = int_func(node.attrib['numLuts'])
arr = numpy.zeros((dim1, dim2), dtype=numpy.uint16)
lut_key = cls._child_xml_ns_key.get('LUTValues', ns_key)
lut_nodes = _find_children(node, 'LUTValues', xml_ns, lut_key)
for i, lut_node in enumerate(lut_nodes):
arr[:, i] = [str(el) for el in _get_node_value(lut_node)]
if numpy.max(arr) < 256:
arr = numpy.cast[numpy.uint8](arr)
return cls(LUTValues=arr)
# get the bytes offset for this nitf image segment
this_rows, this_cols = img_header.NROWS, img_header.NCOLS
if this_rows > rows or this_cols > cols:
raise ValueError(
'NITF image segment at index {} has size ({}, {}), and cannot be part of an image of size '
'({}, {})'.format(index, this_rows, this_cols, rows, cols))
# horizontal block details
horizontal_block_size = this_rows
if img_header.NBPR != 1:
if (this_cols % img_header.NBPR) != 0:
raise ValueError(
'The number of blocks per row is listed as {}, but this is '
'not equally divisible into the number of columns {}'.format(img_header.NBPR, this_cols))
horizontal_block_size = int_func(this_cols/img_header.NBPR)
# vertical block details
vertical_block_size = this_cols
if img_header.NBPC != 1:
if (this_rows % img_header.NBPC) != 0:
raise ValueError(
'The number of blocks per column is listed as {}, but this is '
'not equally divisible into the number of rows {}'.format(img_header.NBPC, this_rows))
vertical_block_size = int_func(this_rows/img_header.NBPC)
# determine where this image segment fits in the overall image
if i == 0:
# establish the beginning
cur_row_start, cur_row_end = 0, this_rows
cur_col_start, cur_col_end = 0, this_cols
elif p_col_end < cols:
ind2 = rng[0] + mult2*rng[2]
else:
if rng[0] < start_ind or rng[1] >= stop_ind:
return None, None
# find largest element rng[0] + mult*rng[2] which is <= stop_ind-1
mult1 = 0 if rng[0] < stop_ind else int_func(numpy.floor((stop_ind - 1 - rng[0])/rng[2]))
ind1 = rng[0] + mult1*rng[2]
# find smallest element rng[0] + mult*rng[2] which is >= max(start_ind, rng[1]+1)
mult2 = int_func(numpy.floor((start_ind - rng[0])/rng[2])) if rng[1] < start_ind \
else int_func(numpy.floor((rng[1] -1 - rng[0])/rng[2]))
ind2 = rng[0] + mult2*rng[2]
return (ind1, ind2, rng[2]), (mult1, mult2)
range1, range2 = self._reorder_arguments(range1, range2)
rows_size = int_func((range1[1]-range1[0])/range1[2])
cols_size = int_func((range2[1]-range2[0])/range2[2])
if self._bands_ip == 1:
out = numpy.empty((rows_size, cols_size), dtype=numpy.complex64)
else:
out = numpy.empty((rows_size, cols_size, self._bands_ip), dtype=numpy.complex64)
for entry, child_chipper in zip(self._bounds, self._child_chippers):
row_start, row_end, col_start, col_end = entry
# find row overlap for chipper - it's rectangular
crange1, cinds1 = subset(range1, row_start, row_end)
if crange1 is None:
continue # there is no row overlap for this chipper
# find column overlap for chipper - it's rectangular
crange2, cinds2 = subset(range2, col_start, col_end)
if crange2 is None:
continue # there is no column overlap for this chipper
"""
start, stop, step = None, None, None
if arg is None:
pass
elif isinstance(arg, integer_types):
step = arg
else:
# NB: following this pattern to avoid confused pycharm inspection
if len(arg) == 1:
step = arg[0]
elif len(arg) == 2:
stop, step = arg
elif len(arg) == 3:
start, stop, step = arg
start = 0 if start is None else int_func(start)
stop = siz if stop is None else int_func(stop)
step = 1 if step is None else int_func(step)
# basic validity check
if not (-siz < start < siz):
raise ValueError(
'Range argument {} has extracted start {}, which is required '
'to be in the range [0, {})'.format(arg, start, siz))
if not (-siz < stop <= siz):
raise ValueError(
'Range argument {} has extracted "stop" {}, which is required '
'to be in the range [0, {}]'.format(arg, stop, siz))
if not ((0 < step < siz) or (-siz < step < 0)):
raise ValueError(
'Range argument {} has extracted step {}, for an axis of length '
'{}'.format(arg, start, siz))
if ((step < 0) and (stop > start)) or ((step > 0) and (start > stop)):
def __init__(self, name, tag_dict, required, strict=DEFAULT_STRICT,
minimum_length=None, maximum_length=None, docstring=None):
self.child_tag = tag_dict[name]['child_tag']
self.minimum_length = self._DEFAULT_MIN_LENGTH if minimum_length is None else int_func(minimum_length)
self.maximum_length = self._DEFAULT_MAX_LENGTH if maximum_length is None else int_func(maximum_length)
if self.minimum_length > self.maximum_length:
raise ValueError(
'Specified minimum length is {}, while specified maximum length is {}'.format(
self.minimum_length, self.maximum_length))
super(_IntegerListDescriptor, self).__init__(name, required, strict=strict, docstring=docstring)
if not (isinstance(complex_type, bool) or callable(complex_type)):
raise ValueError('complex-type must be a boolean or a callable')
self._complex_type = complex_type
if self._complex_type is True and self._data_type.name != 'float32':
raise ValueError(
'complex_type = `True`, which requires that data for writing has '
'dtype complex64/128, and output is written as float32 (data_type). '
'data_type is given as {}.'.format(data_type))
if callable(self._complex_type) and self._data_type.name not in ('uint8', 'int16'):
raise ValueError(
'complex_type is callable, which requires that dtype complex64/128, '
'and output is written as uint8 or uint16. '
'data_type is given as {}.'.format(self._data_type.name))
self._data_offset = int_func(data_offset)
if self._complex_type is False:
self._shape = self._data_size
else:
self._shape = (self._data_size[0], self._data_size[1], 2)
self._memory_map = None
self._fid = None
try:
self._memory_map = numpy.memmap(self._file_name,
dtype=self._data_type,
mode='r+',
offset=self._data_offset,
shape=self._shape)
except (OverflowError, OSError):
# if 32-bit python, then we'll fail for any file larger than 2GB
# we fall-back to a slower version of reading manually
def _call(self, start1, stop1, start2, stop2, data):
if self._memory_map is not None:
self._memory_map[start1:stop1, start2:stop2, :] = data
return
# we have to fall-back to manually write
element_size = int_func(self._data_type.itemsize)
if len(self._shape) == 3:
element_size *= int_func(self._shape[2])
stride = element_size*int_func(self._data_size[0])
# go to the appropriate spot in the file for first entry
self._fid.seek(self._data_offset + stride*start1 + element_size*start2)
if start1 == 0 and stop1 == self._data_size[0]:
# we can write the block all at once
data.astype(self._data_type).tofile(self._fid)
else:
# have to write one row at a time
bytes_to_skip_per_row = element_size*(self._data_size[0]-(stop1-start1))
for i, row in enumerate(data):
# we the row, and then skip to where the next row starts
row.astype(self._data_type).tofile(self._fid)
if i < len(data) - 1:
# don't seek on last entry (avoid segfault, or whatever)
self._fid.seek(bytes_to_skip_per_row, os.SEEK_CUR)
def from_node(cls, node, xml_ns, ns_key=None, kwargs=None):
order1 = int_func(node.attrib['order1'])
order2 = int_func(node.attrib['order2'])
coefs = numpy.zeros((order1+1, order2+1), dtype=numpy.float64)
coef_key = cls._child_xml_ns_key.get('Coefs', ns_key)
coef_nodes = _find_children(node, 'Coef', xml_ns, coef_key)
for cnode in coef_nodes:
ind1 = int_func(cnode.attrib['exponent1'])
ind2 = int_func(cnode.attrib['exponent2'])
val = float(_get_node_value(cnode))
coefs[ind1, ind2] = val
return cls(Coefs=coefs)