Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dataset_name = "test_dataset"
splits = ["train", "valid"]
self.datasets = [
create_dataset(
self.X, self.Y, split, dataset_name, self.data_name, self.task_name
)
for split in splits
]
self.slice_model = SliceAwareClassifier(
base_architecture=self.mlp,
head_dim=self.hidden_dim,
slice_names=[sf.name for sf in sfs],
input_data_key=self.data_name,
task_name=self.task_name,
scorer=Scorer(metrics=["f1"]),
)
def test_no_metrics(self) -> None:
scorer = Scorer()
self.assertEqual(scorer.score(*self._get_labels()), {})
DATA = [5, 10, 19, 22, 25]
@slicing_function()
def sf(x):
return x.num < 20
# We expect 3/5 correct -> 0.6 accuracy
golds = np.array([0, 1, 0, 1, 0])
preds = np.array([0, 0, 0, 0, 0])
probs = preds_to_probs(preds, 2)
# In the slice, we expect the last 2 elements to masked
# We expect 2/3 correct -> 0.666 accuracy
data = [SimpleNamespace(num=x) for x in DATA]
S = SFApplier([sf]).apply(data)
scorer = Scorer(metrics=["accuracy"])
# Test normal score
metrics = scorer.score(golds=golds, preds=preds, probs=probs)
self.assertEqual(metrics["accuracy"], 0.6)
# Test score_slices
slice_metrics = scorer.score_slices(S=S, golds=golds, preds=preds, probs=probs)
self.assertEqual(slice_metrics["overall"]["accuracy"], 0.6)
self.assertEqual(slice_metrics["sf"]["accuracy"], 2.0 / 3.0)
# Test as_dataframe=True
metrics_df = scorer.score_slices(
S=S, golds=golds, preds=preds, probs=probs, as_dataframe=True
)
self.assertTrue(isinstance(metrics_df, pd.DataFrame))
self.assertEqual(metrics_df["accuracy"]["overall"], 0.6)
def test_scorer(self) -> None:
def pred_sum(golds, preds, probs):
return np.sum(preds)
scorer = Scorer(
metrics=["accuracy", "f1"], custom_metric_funcs=dict(pred_sum=pred_sum)
)
results = scorer.score(*self._get_labels())
results_expected = dict(accuracy=0.6, f1=2 / 3, pred_sum=3)
self.assertEqual(results, results_expected)
{
module1_name: nn.Sequential(nn.Linear(2, 20), nn.ReLU()),
module2_name: nn.Linear(20, 2),
}
)
op1 = Operation(module_name=module1_name, inputs=[("_input_", "coordinates")])
op2 = Operation(module_name=module2_name, inputs=[op1.name])
op_sequence = [op1, op2]
task = Task(
name=task_name,
module_pool=module_pool,
op_sequence=op_sequence,
scorer=Scorer(metrics=["f1", "accuracy"]),
)
return task
{
module1_name: nn.Sequential(nn.Linear(2, 20), nn.ReLU()),
module2_name: nn.Linear(20, 2),
}
)
op1 = Operation(module_name=module1_name, inputs=[("_input_", "coordinates")])
op2 = Operation(module_name=module2_name, inputs=[op1.name])
op_sequence = [op1, op2]
task = Task(
name=task_name,
module_pool=module_pool,
op_sequence=op_sequence,
scorer=Scorer(metrics=["accuracy"]),
)
return task
>>> label_model.fit(L)
>>> label_model.score(L, Y=np.array([1, 1, 1]))
{'accuracy': 0.6666666666666666}
>>> label_model.score(L, Y=np.array([1, 1, 1]), metrics=["f1"])
{'f1': 0.8}
"""
if tie_break_policy == "abstain": # pragma: no cover
logging.warning(
"Metrics calculated over data points with non-abstain labels only"
)
Y_pred, Y_prob = self.predict(
L, return_probs=True, tie_break_policy=tie_break_policy
)
scorer = Scorer(metrics=metrics)
results = scorer.score(Y, Y_pred, Y_prob)
return results
module_pool = nn.ModuleDict(
{
"feat_extractor": feature_extractor,
"prediction_head": fc,
"feat_concat": FlatConcat(),
"word_emb": WordEmb(),
}
)
# define task flow through modules
op_sequence = get_op_sequence()
pred_cls_task = Task(
name="visual_relation_task",
module_pool=module_pool,
op_sequence=op_sequence,
scorer=Scorer(metrics=["f1_micro"]),
)
return MultitaskClassifier([pred_cls_task])
# %%
from functools import partial
import torch.nn.functional as F
from snorkel.analysis import Scorer
from snorkel.classification import Task
circle_task = Task(
name="circle_task",
module_pool=module_pool,
op_sequence=op_sequence,
loss_func=F.cross_entropy,
output_func=partial(F.softmax, dim=1),
scorer=Scorer(metrics=["accuracy"]),
)
# %% [markdown]
# Note that `Task` objects are not dependent on a particular dataset; multiple datasets can be passed through the same modules for pre-training or co-training.
# %% [markdown]
# ### Again, but faster
# %% [markdown]
# We'll now define the square task, but more succinctly—for example, using the fact that the default name for an `Operation` is its `module_name` (since most tasks only use their modules once per forward pass).
#
# We'll also define the square task to share the first module in its task flow (`base_mlp`) with the circle task to demonstrate how to share modules. (Note that this is purely for illustrative purposes; for this toy task, it is quite possible that this is not the optimal arrangement of modules).
#
# Finally, the most common task definitions we see in practice are classification tasks with cross-entropy loss and softmax on the output of the last module, and accuracy is most often the primary metric of interest, these are all the default values, so we can drop them here for brevity.
# %%
def __init__(
self, tasks: List[Task], name: Optional[str] = None, **kwargs: Any
) -> None:
super().__init__()
self.config = ClassifierConfig(**kwargs)
self.name = name or type(self).__name__
# Initiate the model attributes
self.module_pool = nn.ModuleDict()
self.task_names: Set[str] = set()
self.op_sequences: Dict[str, Sequence[Operation]] = dict()
self.loss_funcs: Dict[str, Callable[..., torch.Tensor]] = dict()
self.output_funcs: Dict[str, Callable[..., torch.Tensor]] = dict()
self.scorers: Dict[str, Scorer] = dict()
# Build network with given tasks
self._build_network(tasks)
# Report total task count and duplicates
all_ops = [op.name for t in tasks for op in t.op_sequence]
unique_ops = set(all_ops)
all_mods = [mod_name for t in tasks for mod_name in t.module_pool]
unique_mods = set(all_mods)
logging.info(
f"Created multi-task model {self.name} that contains "
f"task(s) {self.task_names} from "
f"{len(unique_ops)} operations ({len(all_ops) - len(unique_ops)} shared) and "
f"{len(unique_mods)} modules ({len(all_mods) - len(unique_mods)} shared)."
)