Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Fit each classifier.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
y : None or array-like, shape = (n_samples,)
Class labels.
Returns
-------
self : object
"""
X = check_3d_array(X)
_, n_features, _ = X.shape
self._check_params(n_features)
if self.weights is None:
self._weights = None
else:
self._weights = np.asarray(self.weights)
self._le = LabelEncoder().fit(y)
self.classes_ = self._le.classes_
y_ind = self._le.transform(y)
for i, clf in enumerate(self.estimators_):
clf.fit(X[:, i, :], y_ind)
return self
def fit(self, X, y):
"""Fit the model according to the given training data.
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
y : array-like, shape = (n_samples,)
Class labels.
Returns
-------
self : object
"""
X = check_3d_array(X)
_, n_features, n_timestamps = X.shape
X_diff = np.abs(np.diff(X))
estimator = WEASEL(
word_size=self.word_size, n_bins=self.n_bins,
window_sizes=self.window_sizes, window_steps=self.window_steps,
anova=self.anova, drop_sum=self.drop_sum, norm_mean=self.norm_mean,
norm_std=self.norm_std, strategy=self.strategy,
chi2_threshold=self.chi2_threshold, sparse=self.sparse,
alphabet=self.alphabet
)
self._estimators = [clone(estimator) for _ in range(n_features)]
self._estimators_diff = [clone(estimator) for _ in range(n_features)]
self.vocabulary_ = {}
"""Pass.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
y : None or array-like, shape = (n_samples,) (default = None)
Class labels.
Returns
-------
self : object
"""
X = check_3d_array(X)
_, n_features, _ = X.shape
self._check_params(n_features)
for i, transformer in enumerate(self.estimators_):
transformer.fit(X[:, i, :], y)
return self
def fit_transform(self, X, y):
"""Fit the data then transform it.
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
y : array-like, shape = (n_samples,)
Class labels.
Returns
-------
X_new : array, shape = (n_samples, n_features_new)
Document-term matrix with relevant features only.
"""
X = check_3d_array(X)
n_samples, n_features, n_timestamps = X.shape
X_diff = np.abs(np.diff(X))
estimator = WEASEL(
word_size=self.word_size, n_bins=self.n_bins,
window_sizes=self.window_sizes, window_steps=self.window_steps,
anova=self.anova, drop_sum=self.drop_sum, norm_mean=self.norm_mean,
norm_std=self.norm_std, strategy=self.strategy,
chi2_threshold=self.chi2_threshold, sparse=self.sparse,
alphabet=self.alphabet
)
self._estimators = [clone(estimator) for _ in range(n_features)]
self._estimators_diff = [clone(estimator) for _ in range(n_features)]
self.vocabulary_ = {}
def transform(self, X):
r"""Apply transform to each feature.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
Returns
-------
X_new : array, shape = (n_samples, *) or (n_samples, n_features, *)
Transformed time series.
"""
X = check_3d_array(X)
n_samples, _, _ = X.shape
check_is_fitted(self, 'estimators_')
X_transformed = [transformer.transform(X[:, i, :])
for i, transformer in enumerate(self.estimators_)]
all_sparse = np.all([isinstance(X_transformed_i, csr_matrix)
for X_transformed_i in X_transformed])
if all_sparse:
X_new = hstack(X_transformed)
else:
X_new = [self._convert_to_array(X_transformed_i)
for X_transformed_i in X_transformed]
ndims = [X_new_i.ndim for X_new_i in X_new]
shapes = [X_new_i.shape for X_new_i in X_new]
one_dim = (np.unique(ndims).size == 1)
if one_dim:
"""Transform each time series into a joint recurrence plot.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
Returns
-------
X_new : array, shape = (n_samples, image_size, image_size)
Joint Recurrence plots. ``image_size`` is the number of
trajectories and is equal to
``n_timestamps - (dimension - 1) * time_delay``.
"""
X = check_3d_array(X)
_, n_features, _ = X.shape
thresholds_, percentages_ = self._check_params(n_features)
X_rp = [self._joint_recurrence_plot(
X[:, i, :], self.dimension, self.time_delay,
thresholds_[i], percentages_[i]) for i in range(n_features)]
X_jrp = np.product(X_rp, axis=0)
return X_jrp
def predict(self, X):
"""Predict class labels using hard voting.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
Returns
-------
y_pred : array, shape = (n_samples,)
Predicted class labels.
"""
X = check_3d_array(X)
check_is_fitted(self, 'estimators_')
n_samples, n_features, _ = X.shape
y_pred = np.empty((n_samples, n_features))
for i, clf in enumerate(self.estimators_):
y_pred[:, i] = clf.predict(X[:, i, :])
maj = _hard_vote(y_pred.astype('int64'), self._weights)
return self._le.inverse_transform(maj)
def transform(self, X):
"""Transform the provided data.
Parameters
----------
X : array-like, shape = (n_samples, n_features, n_timestamps)
Multivariate time series.
Returns
-------
X_new : sparse matrix, shape = (n_samples, n_features_new)
Document-term matrix with relevant learned features only.
"""
check_is_fitted(self, 'vocabulary_')
X = check_3d_array(X)
n_samples, _, _ = X.shape
X_diff = np.abs(np.diff(X))
X_new = []
for i, transformer in enumerate(self._estimators):
X_new.append(transformer.transform(X[:, i, :]))
for i, transformer in enumerate(self._estimators_diff):
X_new.append(transformer.transform(X_diff[:, i, :]))
if self.sparse:
return csr_matrix(hstack(X_new))
return np.hstack(X_new)