Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ok_geographic():
# Generate random data:
np.random.seed(89239413)
lon = 360.0 * np.random.rand(50, 1)
lat = 180.0 * np.random.rand(50, 1) - 90.0
z = np.random.rand(50, 1)
# Generate grid:
grid_lon = 360.0 * np.random.rand(120, 1)
grid_lat = 180.0 * np.random.rand(120, 1) - 90.0
# Create ordinary kriging object:
OK = OrdinaryKriging(
lon,
lat,
z,
variogram_model="linear",
verbose=False,
enable_plotting=False,
coordinates_type="geographic",
)
# Execute on grid:
z, ss = OK.execute("grid", grid_lon, grid_lat)
# of the workflow works as intended (tested: 2e-9 is currently the
# match for this setup):
d_eucl = cdist(
np.concatenate([x[:, np.newaxis], y[:, np.newaxis]], axis=1),
np.concatenate([grid_x[:, np.newaxis], grid_y[:, np.newaxis]], axis=1),
)
d_geo = core.great_circle_distance(
lon[:, np.newaxis],
lat[:, np.newaxis],
grid_lon[np.newaxis, :],
grid_lat[np.newaxis, :],
)
assert_allclose(d_eucl, d_geo, rtol=2e-9)
# Create ordinary kriging objects:
OK_geo = OrdinaryKriging(
lon,
lat,
z,
variogram_model="linear",
verbose=False,
enable_plotting=False,
coordinates_type="geographic",
)
OK_xy = OrdinaryKriging(
x, y, z, variogram_model="linear", verbose=False, enable_plotting=False
)
OK_wrong = OrdinaryKriging(
lon, lat, z, variogram_model="linear", verbose=False, enable_plotting=False
)
# Execute on grid:
def test_ok_backends_produce_same_result(validation_ref):
data, _, (uk_test_answer, gridx_ref, gridy_ref) = validation_ref
gridx = np.linspace(1067000.0, 1072000.0, 100)
gridy = np.linspace(241500.0, 244000.0, 100)
ok = OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="linear",
verbose=False,
enable_plotting=False,
)
z_ok_v, ss_ok_v = ok.execute("grid", gridx, gridy, backend="vectorized")
z_ok_l, ss_ok_l = ok.execute("grid", gridx, gridy, backend="loop")
assert_allclose(z_ok_v, z_ok_l)
assert_allclose(ss_ok_v, ss_ok_l)
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="custom",
variogram_function=func,
)
ok = OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="custom",
variogram_parameters=[1.0, 1.0, 1.0],
variogram_function=func,
)
assert ok.variogram_function([1.0, 1.0, 1.0], 1.0) == approx(1.3010, rel=1e-4)
ok = OrdinaryKriging(data[:, 0], data[:, 1], data[:, 2], variogram_model="linear")
ok.update_variogram_model(
"custom", variogram_parameters=[1.0, 1.0, 1.0], variogram_function=func
)
assert ok.variogram_function([1.0, 1.0, 1.0], 1.0) == approx(1.3010, rel=1e-4)
OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="custom",
variogram_function=0,
)
with pytest.raises(ValueError):
OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="custom",
variogram_function=func,
)
ok = OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="custom",
variogram_parameters=[1.0, 1.0, 1.0],
variogram_function=func,
)
assert ok.variogram_function([1.0, 1.0, 1.0], 1.0) == approx(1.3010, rel=1e-4)
ok = OrdinaryKriging(data[:, 0], data[:, 1], data[:, 2], variogram_model="linear")
ok.update_variogram_model(
"custom", variogram_parameters=[1.0, 1.0, 1.0], variogram_function=func
)
assert ok.variogram_function([1.0, 1.0, 1.0], 1.0) == approx(1.3010, rel=1e-4)
def test_kriging_tools(sample_data_2d):
data, (gridx, gridy, gridx_2), mask_ref = sample_data_2d
ok = OrdinaryKriging(data[:, 0], data[:, 1], data[:, 2])
z_write, ss_write = ok.execute("grid", gridx, gridy)
kt.write_asc_grid(
gridx,
gridy,
z_write,
filename=os.path.join(BASE_DIR, "test_data/temp.asc"),
style=1,
)
z_read, x_read, y_read, cellsize, no_data = kt.read_asc_grid(
os.path.join(BASE_DIR, "test_data/temp.asc")
)
assert_allclose(z_write, z_read, 0.01, 0.01)
assert_allclose(gridx, x_read)
assert_allclose(gridy, y_read)
def test_ok_get_variogram_points(validation_ref):
# Test to compare the variogram of OK results to those obtained using
# KT3D_H2O.
# (M. Karanovic, M. Tonkin, and D. Wilson, 2009, Groundwater,
# vol. 47, no. 4, 580-586.)
# Variogram parameters
_variogram_parameters = [500.0, 3000.0, 0.0]
data, _, (ok_test_answer, gridx, gridy) = validation_ref
ok = OrdinaryKriging(
data[:, 0],
data[:, 1],
data[:, 2],
variogram_model="exponential",
variogram_parameters=_variogram_parameters,
)
# Get the variogram points from the UniversalKriging instance
lags, calculated_variogram = ok.get_variogram_points()
# Generate the expected variogram points according to the
# exponential variogram model
expected_variogram = variogram_models.exponential_variogram_model(
_variogram_parameters, lags
)
# Create ordinary kriging object:
OK = OrdinaryKriging(
lon,
lat,
z,
variogram_model="linear",
verbose=False,
enable_plotting=False,
coordinates_type="geographic",
)
# Execute on grid:
z1, ss1 = OK.execute("grid", grid_lon, grid_lat)
# Create ordinary kriging object ignoring curvature:
OK = OrdinaryKriging(
lon, lat, z, variogram_model="linear", verbose=False, enable_plotting=False
)
# Execute on grid:
z2, ss2 = OK.execute("grid", grid_lon, grid_lat)
# Print data at equator (last longitude index will show periodicity):
print("Original data:")
print("Longitude:", lon.astype(int))
print("Latitude: ", lat.astype(int))
print("z: ", np.array_str(z, precision=2))
print("\nKrige at 60° latitude:\n======================")
print("Longitude:", grid_lon)
print("Value: ", np.array_str(z1[5, :], precision=2))
print("Sigma²: ", np.array_str(ss1[5, :], precision=2))
print("\nIgnoring curvature:\n=====================")
number of dimensions (default=2)
Returns
-------
res : dict
a dictionary with the timing results
"""
X_train = np.random.rand(n_train, n_dim)
y_train = np.random.rand(n_train)
X_test = np.random.rand(n_test, n_dim)
res = {}
for variogram_model in VARIOGRAM_MODELS:
tic = time()
OK = OrdinaryKriging(X_train[:, 0], X_train[:, 1], y_train,
variogram_model='linear',
verbose=False, enable_plotting=False)
res['t_train_{}'.format(variogram_model)] = time() - tic
# All the following tests are performed with the linear variogram model
for backend in BACKENDS:
for n_closest_points in N_MOVING_WINDOW:
if backend == 'vectorized' and n_closest_points is not None:
continue # this is not supported
tic = time()
OK.execute('points', X_test[:, 0], X_test[:, 1],
backend=backend,
n_closest_points=n_closest_points)
res['t_test_{}_{}'.format(backend, n_closest_points)] = time() - tic