Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
step = 1 + l // len(device_ids)
# Start process pool for multi-GPU request
if len(device_ids) > 1: # pragma: no cover
mp.set_start_method("spawn", force=True)
p = mp.Pool(processes=len(device_ids))
results = [None] * len(device_ids)
QT_fnames = []
QT_first_fnames = []
for idx, start in enumerate(range(0, l, step)):
stop = min(l, start + step)
QT, QT_first = _get_QT(start, T_A, T_B, m)
QT_fname = core.array_to_temp_file(QT)
QT_first_fname = core.array_to_temp_file(QT_first)
QT_fnames.append(QT_fname)
QT_first_fnames.append(QT_first_fname)
if len(device_ids) > 1 and idx < len(device_ids) - 1: # pragma: no cover
# Spawn and execute in child process for multi-GPU request
results[idx] = p.apply_async(
_gpu_stump,
(
T_A_fname,
T_B_fname,
m,
stop,
excl_zone,
M_T_fname,
Σ_T_fname,
def naive_mass(Q, T, m, trivial_idx=None, excl_zone=0, ignore_trivial=False):
D = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
if ignore_trivial:
start = max(0, trivial_idx - excl_zone)
stop = min(T.shape[0] - Q.shape[0] + 1, trivial_idx + excl_zone)
D[start:stop] = np.inf
I = np.argmin(D)
P = D[I]
if P == np.inf:
I = -1
# Get left and right matrix profiles for self-joins
if ignore_trivial and trivial_idx > 0:
PL = np.inf
IL = -1
for i in range(trivial_idx):
def naive_mass(Q, T, m, trivial_idx, excl_zone):
D = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
start = max(0, trivial_idx - excl_zone)
stop = min(T.shape[0] - Q.shape[0] + 1, trivial_idx + excl_zone)
D[start:stop] = np.inf
return D
def test_mueen_calculate_distance_profile(Q, T):
m = Q.shape[0]
left = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
right = core.mueen_calculate_distance_profile(Q, T)
npt.assert_almost_equal(left, right)
def naive_mass(Q, T, m, trivial_idx=None, excl_zone=0, ignore_trivial=False):
D = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
if ignore_trivial:
start = max(0, trivial_idx - excl_zone)
stop = min(T.shape[0] - Q.shape[0] + 1, trivial_idx + excl_zone)
D[start:stop] = np.inf
I = np.argmin(D)
P = D[I]
if P == np.inf:
I = -1
# Get left and right matrix profiles for self-joins
if ignore_trivial and trivial_idx > 0:
PL = np.inf
IL = -1
for i in range(trivial_idx):
def naive_mass(Q, T, m, trivial_idx=None, excl_zone=0, ignore_trivial=False):
D = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
if ignore_trivial:
start = max(0, trivial_idx - excl_zone)
stop = min(T.shape[0] - Q.shape[0] + 1, trivial_idx + excl_zone)
D[start:stop] = np.inf
I = np.argmin(D)
P = D[I]
if P == np.inf:
I = -1
if ignore_trivial and trivial_idx > 0:
PL = np.inf
IL = -1
for i in range(trivial_idx):
if D[i] < PL:
def naive_mass(Q, T, m, trivial_idx, excl_zone):
D = np.linalg.norm(
core.z_norm(core.rolling_window(T, m), 1) - core.z_norm(Q), axis=1
)
start = max(0, trivial_idx - excl_zone)
stop = min(T.shape[0] - Q.shape[0] + 1, trivial_idx + excl_zone)
D[start:stop] = np.inf
return D
def test_multi_mass(T, m):
excl_zone = int(np.ceil(m / 4))
trivial_idx = 2
Q = core.rolling_window(T, m)
# left
D = np.empty((Q.shape[0], Q.shape[1]))
for i in range(T.shape[0]):
D[i] = naive_mass(Q[i, 0], T[i], m, trivial_idx, excl_zone)
left_P, left_I = naive_PI(D, trivial_idx)
# right
M_T = np.empty((Q.shape[0], Q.shape[1]))
Σ_T = np.empty((Q.shape[0], Q.shape[1]))
for i in range(Q.shape[0]):
M_T[i] = np.mean(Q[i], axis=1)
Σ_T[i] = np.std(Q[i], axis=1)
right_P, right_I = _multi_mass(Q[:, 0], T, m, M_T, Σ_T, trivial_idx, excl_zone)
def test_stump_self_join_larger_window(T_A, T_B):
for m in [8, 16, 32]:
if len(T_B) > m:
zone = int(np.ceil(m / 4))
left = np.array(
[
naive_mass(Q, T_B, m, i, zone, True)
for i, Q in enumerate(core.rolling_window(T_B, m))
],
dtype=object,
)
right = stomp(T_B, m, ignore_trivial=True)
replace_inf(left)
replace_inf(right)
npt.assert_almost_equal(left, right)
def test_compute_mean_std(Q, T):
m = Q.shape[0]
left_μ_Q = np.sum(Q) / m
left_σ_Q = np.sqrt(np.sum(np.square(Q - left_μ_Q) / m))
left_M_T = np.mean(core.rolling_window(T, m), axis=1)
left_Σ_T = np.std(core.rolling_window(T, m), axis=1)
right_μ_Q, right_σ_Q = core.compute_mean_std(Q, m)
right_M_T, right_Σ_T = core.compute_mean_std(T, m)
npt.assert_almost_equal(left_μ_Q, right_μ_Q)
npt.assert_almost_equal(left_σ_Q, right_σ_Q)
npt.assert_almost_equal(left_M_T, right_M_T)
npt.assert_almost_equal(left_Σ_T, right_Σ_T)