Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
base_task = create_task("task", module_suffixes=["A", "B"])
# Apply SFs
slicing_functions = [f, g] # low-coverage slices
slice_names = [sf.name for sf in slicing_functions]
applier = PandasSFApplier(slicing_functions)
S_train = applier.apply(self.df_train, progress_bar=False)
S_valid = applier.apply(self.df_valid, progress_bar=False)
# Add slice labels
add_slice_labels(dataloaders[0], base_task, S_train)
add_slice_labels(dataloaders[1], base_task, S_valid)
# Convert to slice tasks
tasks = convert_to_slice_tasks(base_task, slice_names)
model = MultitaskClassifier(tasks=tasks)
# Train
# NOTE: Needs more epochs to convergence with more heads
trainer = Trainer(lr=0.001, n_epochs=60, progress_bar=False)
trainer.fit(model, dataloaders)
scores = model.score(dataloaders)
# Confirm reasonably high slice scores
# Check train scores
self.assertGreater(scores["task/TestData/train/f1"], 0.9)
self.assertGreater(scores["task_slice:f_pred/TestData/train/f1"], 0.9)
self.assertGreater(scores["task_slice:f_ind/TestData/train/f1"], 0.9)
self.assertGreater(scores["task_slice:g_pred/TestData/train/f1"], 0.9)
self.assertGreater(scores["task_slice:g_ind/TestData/train/f1"], 0.9)
self.assertGreater(scores["task_slice:base_pred/TestData/train/f1"], 0.9)
self.assertEqual(scores["task_slice:base_ind/TestData/train/f1"], 1.0)
op1 = Operation(module_name=module1_name, inputs=[("_input_", "data")])
op2 = Operation(module_name=module2_name, inputs=[op1.name])
op_sequence = [op1, op2]
task = Task(name=task_name, module_pool=module_pool, op_sequence=op_sequence)
return task
dataloaders = [create_dataloader(task_name) for task_name in TASK_NAMES]
tasks = [
create_task(TASK_NAMES[0], module_suffixes=["A", "A"]),
create_task(TASK_NAMES[1], module_suffixes=["A", "B"]),
]
model = MultitaskClassifier([tasks[0]])
class TrainerTest(unittest.TestCase):
def test_trainer_onetask(self):
"""Train a single-task model"""
trainer = Trainer(**base_config)
trainer.fit(model, [dataloaders[0]])
def test_trainer_twotask(self):
"""Train a model with overlapping modules and flows"""
multitask_model = MultitaskClassifier(tasks)
trainer = Trainer(**base_config)
trainer.fit(multitask_model, dataloaders)
def test_trainer_errors(self):
dataloader = copy.deepcopy(dataloaders[0])
slice_names = [sf.name for sf in slicing_functions]
applier = PandasSFApplier(slicing_functions)
S_train = applier.apply(self.df_train, progress_bar=False)
S_valid = applier.apply(self.df_valid, progress_bar=False)
self.assertEqual(S_train.shape, (self.N_TRAIN,))
self.assertEqual(S_valid.shape, (self.N_VALID,))
self.assertIn("h", S_train.dtype.names)
# Add slice labels
add_slice_labels(dataloaders[0], base_task, S_train)
add_slice_labels(dataloaders[1], base_task, S_valid)
# Convert to slice tasks
tasks = convert_to_slice_tasks(base_task, slice_names)
model = MultitaskClassifier(tasks=tasks)
# Train
trainer = Trainer(lr=0.001, n_epochs=50, progress_bar=False)
trainer.fit(model, dataloaders)
scores = model.score(dataloaders)
# Confirm near perfect scores
self.assertGreater(scores["task/TestData/valid/accuracy"], 0.95)
self.assertGreater(scores["task_slice:h_pred/TestData/valid/accuracy"], 0.95)
self.assertGreater(scores["task_slice:h_ind/TestData/valid/f1"], 0.95)
# Calculate/check train/val loss
train_dataset = dataloaders[0].dataset
train_loss_output = model.calculate_loss(
train_dataset.X_dict, train_dataset.Y_dict
)
def test_twotask_all_overlap_model(self):
"""Add two tasks with identical modules and flows"""
task1 = create_task("task1", module_suffixes=["A", "A"])
task2 = create_task("task2", module_suffixes=["A", "A"])
model = MultitaskClassifier(tasks=[task1, task2])
self.assertEqual(len(model.task_names), 2)
self.assertEqual(len(model.op_sequences), 2)
self.assertEqual(len(model.module_pool), 2)
def test_partially_empty_batch(self):
dataset = create_dataloader("task1", shuffle=False).dataset
dataset.Y_dict["task1"][0] = -1
model = MultitaskClassifier([self.task1])
loss_dict, count_dict = model.calculate_loss(dataset.X_dict, dataset.Y_dict)
self.assertEqual(count_dict["task1"], 9)
def test_twotask_none_overlap_model(self):
"""Add two tasks with totally separate modules and flows"""
model = MultitaskClassifier(tasks=[self.task1, self.task2])
self.assertEqual(len(model.task_names), 2)
self.assertEqual(len(model.op_sequences), 2)
self.assertEqual(len(model.module_pool), 4)
def test_empty_batch(self):
dataset = create_dataloader("task1", shuffle=False).dataset
dataset.Y_dict["task1"] = torch.full_like(dataset.Y_dict["task1"], -1)
model = MultitaskClassifier([self.task1])
loss_dict, count_dict = model.calculate_loss(dataset.X_dict, dataset.Y_dict)
self.assertFalse(loss_dict)
self.assertFalse(count_dict)
def test_no_data_parallel(self):
model = MultitaskClassifier(tasks=[self.task1, self.task2], dataparallel=False)
self.assertEqual(len(model.task_names), 2)
self.assertIsInstance(model.module_pool["linear1A"], nn.Module)
mask = x % 2 == 0
out = torch.zeros(x.shape[0], 2)
out[mask, 0] = 1 # class 0
out[~mask, 1] = 1 # class 1
return out
# Create model
task_name = "VotingTask"
module_name = "simple_voter"
module_pool = nn.ModuleDict({module_name: SimpleVoter()})
op0 = Operation(
module_name=module_name, inputs=[("_input_", "data")], name="op0"
)
op_sequence = [op0]
task = Task(name=task_name, module_pool=module_pool, op_sequence=op_sequence)
model = MultitaskClassifier([task])
# Create dataset
y_list = [0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
x_list = [i for i in range(len(y_list))]
Y = torch.LongTensor(y_list * 100)
X = torch.FloatTensor(x_list * 100)
dataset = DictDataset(
name="dataset", split="train", X_dict={"data": X}, Y_dict={task_name: Y}
)
# Create dataloaders
dataloader = DictDataLoader(dataset, batch_size=2, shuffle=False)
scores = model.score([dataloader])
self.assertEqual(scores["VotingTask/dataset/train/accuracy"], 0.6)
fd, checkpoint_path = tempfile.mkstemp()
task1 = create_task("task1")
task2 = create_task("task2")
# Make task2's second linear layer have different weights
task2.module_pool["linear2"] = nn.Linear(2, 2)
model = MultitaskClassifier([task1])
self.assertTrue(
torch.eq(
task1.module_pool["linear2"].weight,
model.module_pool["linear2"].module.weight,
).all()
)
model.save(checkpoint_path)
model = MultitaskClassifier([task2])
self.assertFalse(
torch.eq(
task1.module_pool["linear2"].weight,
model.module_pool["linear2"].module.weight,
).all()
)
model.load(checkpoint_path)
self.assertTrue(
torch.eq(
task1.module_pool["linear2"].weight,
model.module_pool["linear2"].module.weight,
).all()
)
os.close(fd)