Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fit(self, X, y=None):
SymbolicAggregateApproximation._fit(self, X, y)
n_ts, sz, d = X.shape
sz_segment = sz // self.n_segments
sigma_l = self.sigma_l
if sigma_l is None:
sigma_l = numpy.sqrt(0.03 / sz_segment)
self.breakpoints_slope_ = _breakpoints(self.alphabet_size_slope,
scale=sigma_l)
self.breakpoints_slope_middle_ = _bin_medians(self.alphabet_size_slope,
scale=sigma_l)
return self
# Nearest neighbor classification
knn_clf = KNeighborsTimeSeriesClassifier(n_neighbors=3, metric="dtw")
knn_clf.fit(X_train, y_train)
predicted_labels = knn_clf.predict(X_test)
print("\n2. Nearest neighbor classification using DTW")
print("Correct classification rate:", accuracy_score(y_test, predicted_labels))
# Nearest neighbor classification with a different metric (Euclidean distance)
knn_clf = KNeighborsTimeSeriesClassifier(n_neighbors=3, metric="euclidean")
knn_clf.fit(X_train, y_train)
predicted_labels = knn_clf.predict(X_test)
print("\n3. Nearest neighbor classification using L2")
print("Correct classification rate:", accuracy_score(y_test, predicted_labels))
# Nearest neighbor classification based on SAX representation
sax_trans = SymbolicAggregateApproximation(n_segments=10, alphabet_size_avg=5)
knn_clf = KNeighborsTimeSeriesClassifier(n_neighbors=3, metric="euclidean")
pipeline_model = Pipeline(steps=[('sax', sax_trans), ('knn', knn_clf)])
pipeline_model.fit(X_train, y_train)
predicted_labels = pipeline_model.predict(X_test)
print("\n4. Nearest neighbor classification using SAX+MINDIST")
print("Correct classification rate:", accuracy_score(y_test, predicted_labels))
def _transform(self, X, y=None):
if self.size_fitted_ < 0:
raise ValueError("Model not fitted yet: cannot be used for " +
"distance computation.")
n_ts, sz_raw, d = X.shape
X_1d_sax = numpy.empty((n_ts, self.n_segments, 2 * d), dtype=numpy.int)
# Average
X_1d_sax_avg = SymbolicAggregateApproximation._transform(self, X)
# Slope
X_slopes = self._get_slopes(X)
X_1d_sax_slope = _paa_to_symbols(X_slopes, self.breakpoints_slope_)
X_1d_sax[:, :, :d] = X_1d_sax_avg
X_1d_sax[:, :, d:] = X_1d_sax_slope
return X_1d_sax
numpy.random.seed(0)
# Generate a random walk time series
n_ts, sz, d = 1, 100, 1
dataset = random_walks(n_ts=n_ts, sz=sz, d=d)
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.) # Rescale time series
dataset = scaler.fit_transform(dataset)
# PAA transform (and inverse transform) of the data
n_paa_segments = 10
paa = PiecewiseAggregateApproximation(n_segments=n_paa_segments)
paa_dataset_inv = paa.inverse_transform(paa.fit_transform(dataset))
# SAX transform
n_sax_symbols = 8
sax = SymbolicAggregateApproximation(n_segments=n_paa_segments,
alphabet_size_avg=n_sax_symbols)
sax_dataset_inv = sax.inverse_transform(sax.fit_transform(dataset))
# 1d-SAX transform
n_sax_symbols_avg = 8
n_sax_symbols_slope = 8
one_d_sax = OneD_SymbolicAggregateApproximation(
n_segments=n_paa_segments,
alphabet_size_avg=n_sax_symbols_avg,
alphabet_size_slope=n_sax_symbols_slope)
transformed_data = one_d_sax.fit_transform(dataset)
one_d_sax_dataset_inv = one_d_sax.inverse_transform(transformed_data)
plt.figure()
plt.subplot(2, 2, 1) # First, raw time series
plt.plot(dataset[0].ravel(), "b-")
A dataset of SAX series.
Returns
-------
numpy.ndarray of shape (n_ts, sz_original_ts, d)
A dataset of time series corresponding to the provided
representation.
"""
X_ = numpy.array(X, dtype=numpy.int)
return inv_transform_sax(
X_,
breakpoints_middle_=self.breakpoints_avg_middle_,
original_size=self.size_fitted_)
class OneD_SymbolicAggregateApproximation(SymbolicAggregateApproximation):
"""One-D Symbolic Aggregate approXimation (1d-SAX) transformation.
1d-SAX was originally presented in [1]_.
Parameters
----------
n_segments : int
Number of PAA segments to compute.
alphabet_size_avg : int
Number of SAX symbols to use to describe average values.
alphabet_size_slope : int
Number of SAX symbols to use to describe slopes.
sigma_l : float or None (default: None)
Scale parameter of the Gaussian distribution used to quantize slopes.
If None, the formula given in [1]_ is
used: :math:`\\sigma_L = \\sqrt{0.03 / L}` where :math:`L` is the
def __init__(self, n_segments, alphabet_size_avg, alphabet_size_slope,
sigma_l=None):
SymbolicAggregateApproximation.__init__(
self, n_segments,
alphabet_size_avg=alphabet_size_avg)
self.alphabet_size_slope = alphabet_size_slope
self.sigma_l = sigma_l
# Do that at fit time when we have sigma_l for sure
self.breakpoints_slope_ = None
self.breakpoints_slope_middle_ = None