Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_sampling_type = 'bypass'
def _fit_resample(self, X, y):
return X, y
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 15))
X, y = create_dataset(n_samples=10000, weights=(0.01, 0.05, 0.94))
sampler = FakeSampler()
clf = make_pipeline(sampler, LinearSVC())
plot_resampling(X, y, sampler, ax1)
ax1.set_title('Original data - y={}'.format(Counter(y)))
ax_arr = (ax2, ax3, ax4)
for ax, sampler in zip(ax_arr, (RandomOverSampler(random_state=0),
SMOTE(random_state=0),
ADASYN(random_state=0))):
clf = make_pipeline(sampler, LinearSVC())
clf.fit(X, y)
plot_resampling(X, y, sampler, ax)
ax.set_title('Resampling using {}'.format(sampler.__class__.__name__))
fig.tight_layout()
###############################################################################
# The following plot illustrate the difference between ADASYN and SMOTE. ADASYN
# will focus on the samples which are difficult to classify with a
# nearest-neighbors rule while regular SMOTE will not make any distinction.
# Therefore, the decision function depending of the algorithm.
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 6))
X, y = create_dataset(n_samples=10000, weights=(0.01, 0.05, 0.94))
self.pos_samples = pos_samples
self.ratio_sampler = None
super(ModifiedRandomOverSampler, self).__init__(random_state=random_state)
def fit(self, X, y):
pos = self.pos_samples
neg = len(y[y == 0])
self.ratio_sampler = RandomOverSampler(random_state=self.random_state, ratio={0: neg, 1: pos})
self.ratio_sampler.fit(X, y)
return self
def sample(self, X, y):
return self.ratio_sampler.sample(X, y)
class ModifiedSMOTE(SMOTE):
def __init__(self, pos_samples, random_state=0):
self.pos_samples = pos_samples
self.ratio_sampler = None
super(ModifiedSMOTE, self).__init__(random_state=random_state)
def fit(self, X, y):
pos = self.pos_samples
neg = len(y[y == 0])
self.ratio_sampler = SMOTE(random_state=self.random_state, ratio={0: neg, 1: pos})
self.ratio_sampler.fit(X, y)
return self
def sample(self, X, y):
return self.ratio_sampler.sample(X, y)
def fit(self, X, y):
pos = self.pos_samples
neg = len(y[y == 0])
self.ratio_sampler = SMOTE(random_state=self.random_state, ratio={0: neg, 1: pos})
self.ratio_sampler.fit(X, y)
return self
from imblearn import pipeline as pl
from imblearn.metrics import (geometric_mean_score,
make_index_balanced_accuracy)
print(__doc__)
RANDOM_STATE = 42
# Generate a dataset
X, y = datasets.make_classification(n_classes=3, class_sep=2,
weights=[0.1, 0.9], n_informative=10,
n_redundant=1, flip_y=0, n_features=20,
n_clusters_per_class=4, n_samples=5000,
random_state=RANDOM_STATE)
pipeline = pl.make_pipeline(os.SMOTE(random_state=RANDOM_STATE),
LinearSVC(random_state=RANDOM_STATE))
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y,
random_state=RANDOM_STATE)
# Train the classifier with balancing
pipeline.fit(X_train, y_train)
# Test the classifier and get the prediction
y_pred_bal = pipeline.predict(X_test)
###############################################################################
# The geometric mean corresponds to the square root of the product of the
# sensitivity and specificity. Combining the two metrics should account for
# the balancing of the dataset.
def fit(self, X, y):
self.model_list = []
df = pd.DataFrame(X); df['label'] = y
df_maj = df[df['label']==0]; n_maj = len(df_maj)
df_min = df[df['label']==1]; n_min = len(df_min)
cols = df.columns.tolist(); cols.remove('label')
for ibagging in range(self.n_estimators):
b = min(0.1*((ibagging%10)+1), 1)
train_maj = df_maj.sample(frac=b, replace=True)
train_min = df_min.sample(frac=b, replace=True)
# train_maj = df_maj.sample(frac=1/self.n_estimators, replace=True)
# train_min = df_min.sample(frac=1/self.n_estimators, replace=True)
# train_maj = df_maj.sample(n=n_min, replace=True)
# train_min = df_min.sample(frac=1/self.n_estimators, replace=True)
df_k = train_maj.append(train_min)
X_train, y_train = SMOTE_IMB(k_neighbors=min(5, len(train_min)-1)).fit_resample(df_k[cols], df_k['label'])
# print ('Bagging Iter: {} |b: {:.1f}|n_train: {}|n_smote: {}'.format(
# ibagging, b, len(y_train), len(y_train)-len(df_k)))
model = DT().fit(X_train, y_train)
self.model_list.append(model)
return self
###############################################################################
# Balancing the class before classification
###############################################################################
###############################################################################
# To improve the prediction of the class \#3, it could be interesting to apply
# a balancing before to train the naive bayes classifier. Therefore, we will
# use a ``RandomUnderSampler`` to equalize the number of samples in all the
# classes before the training.
#
# It is also important to note that we are using the ``make_pipeline`` function
# implemented in imbalanced-learn to properly handle the samplers.
pipe = make_pipeline_imb(TfidfVectorizer(),
RandomUnderSampler(),
MultinomialNB())
pipe.fit(X_train, y_train)
y_pred = pipe.predict(X_test)
###############################################################################
# Although the results are almost identical, it can be seen that the resampling
# allowed to correct the poor recall of the class \#3 at the cost of reducing
# the other metrics for the other classes. However, the overall results are
# slightly better.
print(classification_report_imbalanced(y_test, y_pred))
test_size=(1. - share_train),
stratify=stratify
)
self.dependent = y.name
if X_label:
self.independent = X_label
else:
self.independent = list(X.columns.values)
self.balance = balance
if balance == 'upsample':
ros = RandomOverSampler()
X_resample, y_resample = ros.fit_sample(X_train, y_train)
elif balance == 'downsample':
rus = RandomUnderSampler()
X_resample, y_resample = rus.fit_sample(X_train, y_train)
else:
X_resample = X
y_resample = y
self.X_train, X_test, self.y_train, y_test = train_test_split(
X_resample,
y_resample,
test_size=(1. - share_train),
stratify=stratify
)
###############################################################################
# ``InstanceHardnessThreshold`` uses the prediction of classifier to exclude
# samples. All samples which are classified with a low probability will be
# removed.
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 6))
X, y = create_dataset(n_samples=5000, weights=(0.01, 0.05, 0.94),
class_sep=0.8)
clf = LinearSVC().fit(X, y)
plot_decision_function(X, y, clf, ax1)
ax1.set_title('Linear SVC with y={}'.format(Counter(y)))
sampler = InstanceHardnessThreshold(
random_state=0, estimator=LogisticRegression(solver='lbfgs',
multi_class='auto'))
clf = make_pipeline(sampler, LinearSVC())
clf.fit(X, y)
plot_decision_function(X, y, clf, ax2)
ax2.set_title('Decision function for {}'.format(sampler.__class__.__name__))
plot_resampling(X, y, sampler, ax3)
ax3.set_title('Resampling using {}'.format(sampler.__class__.__name__))
fig.tight_layout()
plt.show()
# nearest-neighbors rule while regular SMOTE will not make any distinction.
# Therefore, the decision function depending of the algorithm.
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 6))
X, y = create_dataset(n_samples=10000, weights=(0.01, 0.05, 0.94))
clf = LinearSVC().fit(X, y)
plot_decision_function(X, y, clf, ax1)
ax1.set_title('Linear SVC with y={}'.format(Counter(y)))
sampler = SMOTE()
clf = make_pipeline(sampler, LinearSVC())
clf.fit(X, y)
plot_decision_function(X, y, clf, ax2)
ax2.set_title('Decision function for {}'.format(sampler.__class__.__name__))
sampler = ADASYN()
clf = make_pipeline(sampler, LinearSVC())
clf.fit(X, y)
plot_decision_function(X, y, clf, ax3)
ax3.set_title('Decision function for {}'.format(sampler.__class__.__name__))
fig.tight_layout()
###############################################################################
# Due to those sampling particularities, it can give rise to some specific
# issues as illustrated below.
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 15))
X, y = create_dataset(n_samples=5000, weights=(0.01, 0.05, 0.94),
class_sep=0.8)
ax_arr = ((ax1, ax2), (ax3, ax4))
for ax, sampler in zip(ax_arr, (SMOTE(random_state=0),
ADASYN(random_state=0))):
def pseudo_label(pipeline, x_lab, y_lab, x_unlab, y_unlab, threshold=None):
model = make_pipeline(*pipeline)
model.fit(x_lab, y_lab)
pseudo_lab = pd.DataFrame({
'actual': y_unlab,
'predict_proba': model.predict_proba(x_unlab)[:, 1]
})
if threshold:
results = threshold_metrics(pseudo_lab['actual'], pseudo_lab['predict_proba'], threshold=threshold)
else:
results = threshold_metrics(pseudo_lab['actual'], pseudo_lab['predict_proba'], rank_best='lab_gmean')
pseudo_lab['predicted'] = (pseudo_lab['predict_proba'] > results['lab_threshold']).astype(int)
y_pseudo = pseudo_lab['predicted'].values
results['lab_num_pos'] = np.sum(y_pseudo)
results['lab_num_neg'] = y_pseudo.shape[0] - results['lab_num_pos']