Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multi_mass(T, m):
excl_zone = int(np.ceil(m / 4))
trivial_idx = 2
Q = core.rolling_window(T, m)
# left
D = np.empty((Q.shape[0], Q.shape[1]))
for i in range(T.shape[0]):
D[i] = naive_mass(Q[i, 0], T[i], m, trivial_idx, excl_zone)
left_P, left_I = naive_PI(D, trivial_idx)
# right
M_T = np.empty((Q.shape[0], Q.shape[1]))
Σ_T = np.empty((Q.shape[0], Q.shape[1]))
for i in range(Q.shape[0]):
M_T[i] = np.mean(Q[i], axis=1)
Σ_T[i] = np.std(Q[i], axis=1)
right_P, right_I = _multi_mass(Q[:, 0], T, m, M_T, Σ_T, trivial_idx, excl_zone)
def test_stump_self_join_larger_window(T_A, T_B):
for m in [8, 16, 32]:
if len(T_B) > m:
zone = int(np.ceil(m / 4))
left = np.array(
[
naive_mass(Q, T_B, m, i, zone, True)
for i, Q in enumerate(core.rolling_window(T_B, m))
],
dtype=object,
)
right = stomp(T_B, m, ignore_trivial=True)
replace_inf(left)
replace_inf(right)
npt.assert_almost_equal(left, right)
def test_compute_mean_std(Q, T):
m = Q.shape[0]
left_μ_Q = np.sum(Q) / m
left_σ_Q = np.sqrt(np.sum(np.square(Q - left_μ_Q) / m))
left_M_T = np.mean(core.rolling_window(T, m), axis=1)
left_Σ_T = np.std(core.rolling_window(T, m), axis=1)
right_μ_Q, right_σ_Q = core.compute_mean_std(Q, m)
right_M_T, right_Σ_T = core.compute_mean_std(T, m)
npt.assert_almost_equal(left_μ_Q, right_μ_Q)
npt.assert_almost_equal(left_σ_Q, right_σ_Q)
npt.assert_almost_equal(left_M_T, right_M_T)
npt.assert_almost_equal(left_Σ_T, right_Σ_T)
def test_compute_mean_std(Q, T):
m = Q.shape[0]
left_μ_Q = np.sum(Q) / m
left_σ_Q = np.sqrt(np.sum(np.square(Q - left_μ_Q) / m))
left_M_T = np.mean(core.rolling_window(T, m), axis=1)
left_Σ_T = np.std(core.rolling_window(T, m), axis=1)
right_μ_Q, right_σ_Q = core.compute_mean_std(Q, m)
right_M_T, right_Σ_T = core.compute_mean_std(T, m)
npt.assert_almost_equal(left_μ_Q, right_μ_Q)
npt.assert_almost_equal(left_σ_Q, right_σ_Q)
npt.assert_almost_equal(left_M_T, right_M_T)
npt.assert_almost_equal(left_Σ_T, right_Σ_T)
def test_stumped_self_join(T_A, T_B, dask_client):
m = 3
zone = int(np.ceil(m / 4))
left = np.array(
[
naive_mass(Q, T_B, m, i, zone, True)
for i, Q in enumerate(core.rolling_window(T_B, m))
],
dtype=object,
)
right = stumped(dask_client, T_B, m, ignore_trivial=True)
replace_inf(left)
replace_inf(right)
npt.assert_almost_equal(left, right)
def test_mass(Q, T):
m = Q.shape[0]
left = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
right = core.mass(Q, T)
npt.assert_almost_equal(left, right)