Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Parameters
----------
ts_to_be_rescaled : numpy.ndarray
A time series dataset of base modalities of shape (n_ts, sz, d) with
``d = self.reference_series_.shape[-1]``
"""
ts_to_be_rescaled = to_time_series_dataset(ts_to_be_rescaled)
# Now ts_to_be_rescaled is of shape n_ts, sz, d
# with d = self.reference_series.shape[-1]
self.saved_dtw_paths_ = []
for ts in ts_to_be_rescaled:
end = first_non_finite_index(ts)
resampled_ts = _resampled(ts[:end], n_samples=self.n_samples, kind=self.interp_kind)
if self.metric == "dtw":
path, d = dtw_path(self.reference_series_, resampled_ts)
elif self.metric == "lrdtw":
path, d = lr_dtw_path(self.reference_series_, resampled_ts, gamma=self.gamma_lr_dtw)
else:
raise ValueError("Unknown alignment function")
self.saved_dtw_paths_.append(path)
def _petitjean_assignment(X, barycenter):
n = X.shape[0]
barycenter_size = barycenter.shape[0]
assign = ([[] for _ in range(barycenter_size)],
[[] for _ in range(barycenter_size)])
for i in range(n):
path, _ = dtw_path(X[i], barycenter)
for pair in path:
assign[0][pair[1]].append(i)
assign[1][pair[1]].append(pair[0])
return assign
# License: BSD 3 clause
import numpy
import matplotlib.pyplot as plt
from tslearn.generators import random_walks
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn import metrics
numpy.random.seed(0)
n_ts, sz, d = 2, 100, 1
dataset = random_walks(n_ts=n_ts, sz=sz, d=d)
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.) # Rescale time series
dataset_scaled = scaler.fit_transform(dataset)
path, sim = metrics.dtw_path(dataset_scaled[0], dataset_scaled[1])
matrix_path = numpy.zeros((sz, sz), dtype=numpy.int)
for i, j in path:
matrix_path[i, j] = 1
plt.figure()
plt.subplot2grid((1, 3), (0, 0), colspan=2)
plt.plot(numpy.arange(sz), dataset_scaled[0, :, 0])
plt.plot(numpy.arange(sz), dataset_scaled[1, :, 0])
plt.subplot(1, 3, 3)
plt.imshow(matrix_path, cmap="gray_r")
plt.tight_layout()
plt.show()
def _petitjean_assignment(self, X, barycenter):
n = X.shape[0]
assign = ([[] for _ in range(self.barycenter_size)],
[[] for _ in range(self.barycenter_size)])
for i in range(n):
path, _ = dtw_path(X[i], barycenter)
for pair in path:
assign[0][pair[1]].append(i)
assign[1][pair[1]].append(pair[0])
return assign
inputs, target, breakpoints = data
inputs = torch.tensor(inputs, dtype=torch.float32).to(device)
target = torch.tensor(target, dtype=torch.float32).to(device)
batch_size, N_output = target.shape[0:2]
outputs = net(inputs)
# MSE
loss_mse = criterion(target,outputs)
loss_dtw, loss_tdi = 0,0
# DTW and TDI
for k in range(batch_size):
target_k_cpu = target[k,:,0:1].view(-1).detach().cpu().numpy()
output_k_cpu = outputs[k,:,0:1].view(-1).detach().cpu().numpy()
loss_dtw += dtw(target_k_cpu,output_k_cpu)
path, sim = dtw_path(target_k_cpu, output_k_cpu)
Dist = 0
for i,j in path:
Dist += (i-j)*(i-j)
loss_tdi += Dist / (N_output*N_output)
loss_dtw = loss_dtw /batch_size
loss_tdi = loss_tdi / batch_size
# print statistics
losses_mse.append( loss_mse.item() )
losses_dtw.append( loss_dtw )
losses_tdi.append( loss_tdi )
print( ' Eval mse= ', np.array(losses_mse).mean() ,' dtw= ',np.array(losses_dtw).mean() ,' tdi= ', np.array(losses_tdi).mean())