Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from itertools import groupby
from math import floor, ceil
import dask.array as da
import numpy as np
import numba as nb
from dask.delayed import delayed
from numba import prange
from .utils import ngjit
try:
# Try to create numba JIT with 'parallel' target
ngjit_parallel = nb.jit(nopython=True, nogil=True, parallel=True)
except:
ngjit_parallel, prange = ngjit, range # NOQA
#: Interpolation method for upsampling: Take nearest source grid cell, even if it is invalid.
US_NEAREST = 10
#: Interpolation method for upsampling: Bi-linear interpolation between the 4 nearest source grid cells.
US_LINEAR = 11
#: Aggregation method for downsampling: Take first valid source grid cell, ignore contribution areas.
DS_FIRST = 50
#: Aggregation method for downsampling: Take last valid source grid cell, ignore contribution areas.
DS_LAST = 51
#: Aggregation method for downsampling: Take the minimum source grid cell value, ignore contribution areas.
DS_MIN = 52
#: Aggregation method for downsampling: Take the maximum source grid cell value, ignore contribution areas.
DS_MAX = 53
#: Aggregation method for downsampling: Compute average of all valid source grid cells,
@ngjit
def masked_clip_2d(data, mask, lower, upper):
"""
Clip the elements of an input array between lower and upper bounds,
skipping over elements that are masked out.
Parameters
----------
data: np.ndarray
Numeric ndarray that will be clipped in-place
mask: np.ndarray
Boolean ndarray where True values indicate elements that should be
skipped
lower: int or float
Lower bound to clip to
upper: int or float
Upper bound to clip to
@ngjit
def _hilbert_integer_to_transpose(p, h):
"""Store a hilbert integer (`h`) as its transpose (`x`).
Args:
p (int): iterations to use in the hilbert curve
h (int): integer distance along hilbert curve
Returns:
x (list): transpose of h
(n components with values between 0 and 2**p-1)
"""
n = 2
h_bits = _int_2_binary(h, p * n)
x = [_binary_2_int(h_bits[i::n]) for i in range(n)]
return x
@ngjit
def _transpose_to_hilbert_integer(p, x, y):
"""Restore a hilbert integer (`h`) from its transpose (`x`).
Args:
p (int): iterations to use in the hilbert curve
x (list): transpose of h
(n components with values between 0 and 2**p-1)
Returns:
h (int): integer distance along hilbert curve
"""
bin1 = _int_2_binary(x, p)
bin2 = _int_2_binary(y, p)
concat = np.zeros(2*p, dtype=np.uint8)
for i in range(p):
concat[2*i] = bin1[i]
@ngjit
def draw_triangle(verts, bbox, biases, aggs, val):
"""Draw a triangle on a grid.
Plots a triangle with integer coordinates onto a pixel grid,
clipping to the bounds. The vertices are assumed to have
already been scaled and transformed.
"""
minx, maxx, miny, maxy = bbox
if minx == maxx and miny == maxy:
# Subpixel case; area == 0
append(minx, miny, *(aggs + (val,)))
else:
(ax, ay), (bx, by), (cx, cy) = verts
bias0, bias1, bias2 = biases
for j in range(miny, maxy+1):
for i in range(minx, maxx+1):
@ngjit
@expand_aggs_and_cols
def extend_cpu(
sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols
):
nrows, ncols = xs.shape
for i in range(nrows):
for j in range(ncols - 1):
perform_extend(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax,
xs, ys, *aggs_and_cols
)
@ngjit
def bounds_interleaved(values):
"""
compute bounds
"""
xmin = np.inf
ymin = np.inf
xmax = -np.inf
ymax = -np.inf
for i in range(0, len(values), 2):
x = values[i]
if np.isfinite(x):
xmin = min(xmin, x)
xmax = max(xmax, x)
y = values[i + 1]
@ngjit
def _ndvi(nir_data, red_data):
out = np.zeros_like(nir_data)
rows, cols = nir_data.shape
for y in range(0, rows):
for x in range(0, cols):
nir = nir_data[y, x]
red = red_data[y, x]
if nir == red: # cover zero divison case
continue
soma = nir + red
out[y, x] = (nir - red) / soma
return out
@ngjit
def _binary(data, values):
out = np.zeros_like(data)
rows, cols = data.shape
for x in range(0, rows):
for y in range(0, cols):
if data[y, x] in values:
out[y, x] = True
else:
out[y, x] = False
return out
@ngjit
@expand_aggs_and_cols
def perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols
):
x0 = xs[i, j]
y0 = ys[j]
x1 = xs[i, j + 1]
y1 = ys[j + 1]
segment_start = (
(j == 0) or isnull(xs[i, j - 1]) or isnull(ys[j - 1])
)
draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax,
segment_start, x0, x1, y0, y1, *aggs_and_cols)