Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
grid_lon, grid_lat = np.meshgrid(grid_lon, grid_lat, indexing="xy")
# Flatten the grids:
grid_x = grid_x.flatten()
grid_y = grid_y.flatten()
grid_lon = grid_lon.flatten()
grid_lat = grid_lat.flatten()
# Calculate and compare distance matrices ensuring that that part
# of the workflow works as intended (tested: 2e-9 is currently the
# match for this setup):
d_eucl = cdist(
np.concatenate([x[:, np.newaxis], y[:, np.newaxis]], axis=1),
np.concatenate([grid_x[:, np.newaxis], grid_y[:, np.newaxis]], axis=1),
)
d_geo = core.great_circle_distance(
lon[:, np.newaxis],
lat[:, np.newaxis],
grid_lon[np.newaxis, :],
grid_lat[np.newaxis, :],
)
assert_allclose(d_eucl, d_geo, rtol=2e-9)
# Create ordinary kriging objects:
OK_geo = OrdinaryKriging(
lon,
lat,
z,
variogram_model="linear",
verbose=False,
enable_plotting=False,
coordinates_type="geographic",
np.testing.assert_equal(d <= 180.0, np.ones((N, N), dtype=bool))
# Test great_circle_distance and euclid3_to_great_circle against each other
lon_ref = lon
lat_ref = lat
for i in range(len(lon_ref)):
lon, lat = np.meshgrid(np.linspace(0, 360.0, 20), np.linspace(-90.0, 90.0, 20))
dx = np.cos(np.pi / 180.0 * lon) * np.cos(np.pi / 180.0 * lat) - np.cos(
np.pi / 180.0 * lon_ref[i]
) * np.cos(np.pi / 180.0 * lat_ref[i])
dy = np.sin(np.pi / 180.0 * lon) * np.cos(np.pi / 180.0 * lat) - np.sin(
np.pi / 180.0 * lon_ref[i]
) * np.cos(np.pi / 180.0 * lat_ref[i])
dz = np.sin(np.pi / 180.0 * lat) - np.sin(np.pi / 180.0 * lat_ref[i])
assert_allclose(
core.great_circle_distance(lon_ref[i], lat_ref[i], lon, lat),
core.euclid3_to_great_circle(np.sqrt(dx ** 2 + dy ** 2 + dz ** 2)),
rtol=1e-5,
)
# >>> d *= 180.0/np.pi
# From that distance matrix, the reference values have been obtained.
d_ref = np.array(
[
[0.0, 1e-4, 180.0, 98.744848317171801],
[1e-4, 0.0, 179.9999, 98.744946828324345],
[180.0, 179.9999, 0.0, 81.255151682828213],
[98.744848317171801, 98.744946828324345, 81.255151682828213, 0.0],
]
)
# Calculate distance matrix using the PyKrige code:
d = np.zeros((N, N))
for i in range(N):
for j in range(N):
d[i, j] = core.great_circle_distance(lon[i], lat[i], lon[j], lat[j])
# Test agains reference values:
assert_allclose(d, d_ref)
# Test general features:
assert_allclose(d[np.eye(N, dtype=bool)], 0.0)
np.testing.assert_equal(d >= 0.0, np.ones((N, N), dtype=bool))
assert_allclose(d, d.T)
np.testing.assert_equal(d <= 180.0, np.ones((N, N), dtype=bool))
# Test great_circle_distance and euclid3_to_great_circle against each other
lon_ref = lon
lat_ref = lat
for i in range(len(lon_ref)):
lon, lat = np.meshgrid(np.linspace(0, 360.0, 20), np.linspace(-90.0, 90.0, 20))
dx = np.cos(np.pi / 180.0 * lon) * np.cos(np.pi / 180.0 * lat) - np.cos(
def _get_kriging_matrix(self, n):
"""Assembles the kriging matrix."""
if self.coordinates_type == "euclidean":
xy = np.concatenate(
(self.X_ADJUSTED[:, np.newaxis], self.Y_ADJUSTED[:, np.newaxis]), axis=1
)
d = cdist(xy, xy, "euclidean")
elif self.coordinates_type == "geographic":
d = core.great_circle_distance(
self.X_ADJUSTED[:, np.newaxis],
self.Y_ADJUSTED[:, np.newaxis],
self.X_ADJUSTED,
self.Y_ADJUSTED,
)
a = np.zeros((n + 1, n + 1))
a[:n, :n] = -self.variogram_function(self.variogram_model_parameters, d)
np.fill_diagonal(a, 0.0)
a[n, :] = 1.0
a[:, n] = 1.0
a[n, n] = 0.0
return a
np.sin(lat_p),
),
axis=1,
)
from scipy.spatial import cKDTree
tree = cKDTree(xy_data)
bd, bd_idx = tree.query(xy_points, k=n_closest_points, eps=0.0)
if self.coordinates_type == "geographic":
# Between the nearest neighbours from Euclidean search,
# calculate the great circle distance using the standard method:
x_points = np.tile(xpts[:, np.newaxis], (1, n_closest_points))
y_points = np.tile(ypts[:, np.newaxis], (1, n_closest_points))
bd = core.great_circle_distance(
x_points, y_points, self.X_ADJUSTED[bd_idx], self.Y_ADJUSTED[bd_idx]
)
if backend == "loop":
zvalues, sigmasq = self._exec_loop_moving_window(a, bd, mask, bd_idx)
elif backend == "C":
zvalues, sigmasq = _c_exec_loop_moving_window(
a, bd, mask.astype("int8"), bd_idx, self.X_ADJUSTED.shape[0], c_pars
)
else:
raise ValueError(
"Specified backend {} for a moving window "
"is not supported.".format(backend)
)
else:
if self.coordinates_type == "euclidean":
if backend == "loop":
zvalues, sigmasq = self._exec_loop_moving_window(a, bd, mask, bd_idx)
elif backend == "C":
zvalues, sigmasq = _c_exec_loop_moving_window(
a, bd, mask.astype("int8"), bd_idx, self.X_ADJUSTED.shape[0], c_pars
)
else:
raise ValueError(
"Specified backend {} for a moving window "
"is not supported.".format(backend)
)
else:
if self.coordinates_type == "euclidean":
bd = cdist(xy_points, xy_data, "euclidean")
elif self.coordinates_type == "geographic":
bd = core.great_circle_distance(
xpts[:, np.newaxis],
ypts[:, np.newaxis],
self.X_ADJUSTED,
self.Y_ADJUSTED,
)
if backend == "vectorized":
zvalues, sigmasq = self._exec_vector(a, bd, mask)
elif backend == "loop":
zvalues, sigmasq = self._exec_loop(a, bd, mask)
elif backend == "C":
zvalues, sigmasq = _c_exec_loop(
a, bd, mask.astype("int8"), self.X_ADJUSTED.shape[0], c_pars
)
else:
raise ValueError(