Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
]
mat_file = mat_file_list[0]
mat_file_name = mat_file.replace('.mat', '')
print("\n... Processing", mat_file_name, '...')
mat = sp.io.loadmat(os.path.join('', 'datasets', mat_file))
X = mat['X']
y = mat['y']
# split dataset into train and test
X_train, X_test, y_train, y_test = \
train_test_split(X, y, test_size=0.4, random_state=42)
# standardize data to be digestible for most algorithms
X_train, X_test = standardizer(X_train, X_test)
contamination = y.sum() / len(y)
# get estimators for training and prediction
base_estimators = get_estimators(contamination=contamination)
##########################################################################
model = SUOD(base_estimators=base_estimators, rp_flag_global=True,
approx_clf=approx_clf,
n_jobs=n_jobs, bps_flag=True, contamination=contamination,
approx_flag_global=True)
start = time.time()
model.fit(X_train) # fit all models with X
print('Fit time:', time.time() - start)
print()
except TypeError:
print('{data_file} does not exist. Use generated data'.format(
data_file=mat_file))
X, y = generate_data(train_only=True) # load data
except IOError:
print('{data_file} does not exist. Use generated data'.format(
data_file=mat_file))
X, y = generate_data(train_only=True) # load data
else:
X = mat['X']
y = mat['y'].ravel()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4)
# standardizing data for processing
X_train_norm, X_test_norm = standardizer(X_train, X_test)
n_clf = 20 # number of base detectors
# Initialize 20 base detectors for combination
k_list = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140,
150, 160, 170, 180, 190, 200]
train_scores = np.zeros([X_train.shape[0], n_clf])
test_scores = np.zeros([X_test.shape[0], n_clf])
print('Combining {n_clf} kNN detectors'.format(n_clf=n_clf))
for i in range(n_clf):
k = k_list[i]
clf = KNN(n_neighbors=k, method='largest')
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
# validate inputs X and y (optional)
X = check_array(X)
self._set_n_classes(y)
# PCA is recommended to use on the standardized data (zero mean and
# unit variance).
if self.standardization:
X, self.scaler_ = standardizer(X, keep_scalar=True)
self.detector_ = sklearn_PCA(n_components=self.n_components,
copy=self.copy,
whiten=self.whiten,
svd_solver=self.svd_solver,
tol=self.tol,
iterated_power=self.iterated_power,
random_state=self.random_state)
self.detector_.fit(X=X, y=y)
# copy the attributes from the sklearn PCA object
self.n_components_ = self.detector_.n_components_
self.components_ = self.detector_.components_
# validate the number of components to be used for outlier detection
if self.n_selected_components is None:
# ensure local region size is within acceptable limits
self.local_region_size = max(self.local_region_size, self.local_region_min)
self.local_region_size = min(self.local_region_size, self.local_region_max)
# standardize test data and get local region for each test instance
X_test_norm = X
ind_arr = self._get_local_region(X_test_norm)
# calculate test scores
test_scores = np.zeros([X_test_norm.shape[0], self.n_clf])
for k, estimator in enumerate(self.estimator_list):
test_scores[:, k] = estimator.decision_function(X_test_norm)
# generate standardized scores
train_scores_norm, test_scores_norm = standardizer(self.train_scores_, test_scores)
# generate pseudo target for training --> for calculating weights
self.training_pseudo_label_ = np.max(train_scores_norm, axis=1).reshape(-1, 1)
# placeholder for predictions
pred_scores_ens = np.zeros([X_test_norm.shape[0], ])
# iterate through test instances (ind_arr indices correspond to x_test)
for i, ind_k in enumerate(ind_arr):
# get pseudo target and training scores in local region of test instance
local_pseudo_ground_truth = self.training_pseudo_label_[ind_k,].ravel()
local_train_scores = train_scores_norm[ind_k, :]
# calculate pearson correlation between local pseudo ground truth and local train scores
pearson_corr_scores = np.zeros([self.n_clf, ])
Returns
-------
agg_score: numpy array of shape (n_samples,)
Aggregated scores.
"""
all_scores = np.zeros([X.shape[0], self.n_base_estimators_])
for i, clf in enumerate(self.base_estimators):
if hasattr(clf, 'decision_function'):
all_scores[:, i] = clf.decision_function(X)
else:
raise ValueError(
"{clf} does not have decision_function.".format(clf=clf))
if self.standardization:
all_scores = standardizer(all_scores)
if self.method == 'average':
agg_score = average(all_scores, estimator_weights=self.weights)
if self.method == 'maximization':
agg_score = maximization(all_scores)
if self.method == 'median':
agg_score = median(all_scores)
return agg_score
jl_transformers,
approx_flags[starts[i]:starts[i + 1]],
verbose=True)
for i in range(n_jobs))
print('Orig decision_function time:', time.time() - start)
print()
# unfold and generate the label matrix
predicted_scores_orig = np.zeros([X_test.shape[0], n_estimators])
for i in range(n_jobs):
predicted_scores_orig[:, starts[i]:starts[i + 1]] = np.asarray(
all_results_scores[i]).T
##########################################################################
predicted_scores = standardizer(predicted_scores)
predicted_scores_orig = standardizer(predicted_scores_orig)
evaluate_print('orig', y_test, average(predicted_scores_orig))
evaluate_print('new', y_test, average(predicted_scores))
evaluate_print('orig moa', y_test, moa(predicted_scores_orig))
evaluate_print('new moa', y_test, moa(predicted_scores))