Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_probs_to_preds(self):
np.testing.assert_array_equal(probs_to_preds(PROBS), PREDS)
# abtains with ties
probs = np.array([[0.33, 0.33, 0.33]])
preds = probs_to_preds(probs, tie_break_policy="abstain")
true_preds = np.array([-1])
np.testing.assert_array_equal(preds, true_preds)
# true random with ties
probs = np.array([[0.33, 0.33, 0.33]])
random_preds = []
for seed in range(10):
preds = probs_to_preds(probs, tie_break_policy="true-random")
random_preds.append(preds[0])
# check predicted labels within range
self.assertLessEqual(max(random_preds), 2)
def test_probs_to_preds(self):
np.testing.assert_array_equal(probs_to_preds(PROBS), PREDS)
# abtains with ties
probs = np.array([[0.33, 0.33, 0.33]])
preds = probs_to_preds(probs, tie_break_policy="abstain")
true_preds = np.array([-1])
np.testing.assert_array_equal(preds, true_preds)
# true random with ties
probs = np.array([[0.33, 0.33, 0.33]])
random_preds = []
for seed in range(10):
preds = probs_to_preds(probs, tie_break_policy="true-random")
random_preds.append(preds[0])
# check predicted labels within range
self.assertLessEqual(max(random_preds), 2)
self.assertGreaterEqual(min(random_preds), 0)
# deterministic random with ties
probs = np.array(
[[0.33, 0.33, 0.33], [0.0, 0.5, 0.5], [0.33, 0.33, 0.33], [0.5, 0.5, 0]]
)
random_preds = []
for _ in range(10):
preds = probs_to_preds(probs, tie_break_policy="random")
random_preds.append(preds)
# check labels are same across seeds
# check labels are same across seeds
for i in range(len(random_preds) - 1):
np.testing.assert_array_equal(random_preds[i], random_preds[i + 1])
# check predicted labels within range (only one instance since should all be same)
self.assertLessEqual(max(random_preds[0]), 2)
self.assertGreaterEqual(min(random_preds[0]), 0)
# check invalid policy
with self.assertRaisesRegex(ValueError, "policy not recognized"):
preds = probs_to_preds(probs, tie_break_policy="negative")
# check invalid input
with self.assertRaisesRegex(ValueError, "probs must have probabilities"):
preds = probs_to_preds(np.array([[0.33], [0.33]]))
def test_probs_to_preds(self):
np.testing.assert_array_equal(probs_to_preds(PROBS), PREDS)
# abtains with ties
probs = np.array([[0.33, 0.33, 0.33]])
preds = probs_to_preds(probs, tie_break_policy="abstain")
true_preds = np.array([-1])
np.testing.assert_array_equal(preds, true_preds)
# true random with ties
probs = np.array([[0.33, 0.33, 0.33]])
random_preds = []
for seed in range(10):
preds = probs_to_preds(probs, tie_break_policy="true-random")
random_preds.append(preds[0])
# check predicted labels within range
self.assertLessEqual(max(random_preds), 2)
self.assertGreaterEqual(min(random_preds), 0)
# deterministic random with ties
probs = np.array(
random_preds = []
for seed in range(10):
preds = probs_to_preds(probs, tie_break_policy="true-random")
random_preds.append(preds[0])
# check predicted labels within range
self.assertLessEqual(max(random_preds), 2)
self.assertGreaterEqual(min(random_preds), 0)
# deterministic random with ties
probs = np.array(
[[0.33, 0.33, 0.33], [0.0, 0.5, 0.5], [0.33, 0.33, 0.33], [0.5, 0.5, 0]]
)
random_preds = []
for _ in range(10):
preds = probs_to_preds(probs, tie_break_policy="random")
random_preds.append(preds)
# check labels are same across seeds
for i in range(len(random_preds) - 1):
np.testing.assert_array_equal(random_preds[i], random_preds[i + 1])
# check predicted labels within range (only one instance since should all be same)
self.assertLessEqual(max(random_preds[0]), 2)
self.assertGreaterEqual(min(random_preds[0]), 0)
# check invalid policy
with self.assertRaisesRegex(ValueError, "policy not recognized"):
preds = probs_to_preds(probs, tie_break_policy="negative")
# check invalid input
with self.assertRaisesRegex(ValueError, "probs must have probabilities"):
An [n,1] array of integer labels
(np.ndarray, np.ndarray)
An [n,1] array of integer labels and an [n,k] array of probabilistic labels
Example
-------
>>> L = np.array([[0, 0, -1], [1, 1, -1], [0, 0, -1]])
>>> label_model = LabelModel(verbose=False)
>>> label_model.fit(L)
>>> label_model.predict(L)
array([0, 1, 0])
"""
Y_probs = self.predict_proba(L)
Y_p = probs_to_preds(Y_probs, tie_break_policy)
if return_probs:
return Y_p, Y_probs
return Y_p
# %% [markdown]
# ### Scikit-Learn Classifier
# %% [markdown]
# As we saw in Section 4, the `LabelModel` outputs probabilistic (float) labels.
# If the classifier we are training accepts target labels as floats, we can train on these labels directly (see describe the properties of this type of "noise-aware" loss in our [NeurIPS 2016 paper](https://arxiv.org/abs/1605.07723)).
#
# If we want to use a library or model that doesn't accept probabilistic labels (such as Scikit-Learn), we can instead replace each label distribution with the label of the class that has the maximum probability.
# This can easily be done using the
# [`probs_to_preds` helper method](https://snorkel.readthedocs.io/en/master/packages/_autosummary/utils/snorkel.utils.probs_to_preds.html#snorkel.utils.probs_to_preds).
# We do note, however, that this transformation is lossy, as we no longer have values for our confidence in each label.
# %%
from snorkel.utils import probs_to_preds
preds_train_filtered = probs_to_preds(probs=probs_train_filtered)
# %% [markdown]
# We then use these labels to train a classifier as usual.
# %% {"tags": ["md-exclude-output"]}
from sklearn.linear_model import LogisticRegression
sklearn_model = LogisticRegression(C=1e3, solver="liblinear")
sklearn_model.fit(X=X_train, y=preds_train_filtered)
# %%
print(f"Test Accuracy: {sklearn_model.score(X=X_test, y=Y_test) * 100:.1f}%")
# %% {"tags": ["md-exclude-output"]}
from tf_model import get_model, get_feature_arrays
from utils import get_n_epochs
X_train = get_feature_arrays(df_train_filtered)
model = get_model()
batch_size = 64
model.fit(X_train, probs_train_filtered, batch_size=batch_size, epochs=get_n_epochs())
# %% [markdown]
# Finally, we evaluate the trained model by measuring its F1 score and ROC_AUC.
# %%
X_test = get_feature_arrays(df_test)
probs_test = model.predict(X_test)
preds_test = probs_to_preds(probs_test)
print(
f"Test F1 when trained with soft labels: {metric_score(Y_test, preds=preds_test, metric='f1')}"
)
print(
f"Test ROC-AUC when trained with soft labels: {metric_score(Y_test, probs=probs_test, metric='roc_auc')}"
)
# Note: store results under label_name
# but retrieve from pre-computed results using task_name
prob_dict_list[label_name].extend(prob_batch_dict[task_name])
gold_dict_list[label_name].extend(Y.cpu().numpy())
gold_dict: Dict[str, np.ndarray] = {}
prob_dict: Dict[str, np.ndarray] = {}
for task_name in gold_dict_list:
gold_dict[task_name] = np.array(gold_dict_list[task_name])
prob_dict[task_name] = np.array(prob_dict_list[task_name])
if return_preds:
pred_dict: Dict[str, np.ndarray] = defaultdict(list)
for task_name, probs in prob_dict.items():
pred_dict[task_name] = probs_to_preds(probs)
results = {"golds": gold_dict, "probs": prob_dict}
if return_preds:
results["preds"] = pred_dict
return results