Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_dataloader(task_name="task", split="train"):
X = torch.FloatTensor([[i, i] for i in range(NUM_EXAMPLES)])
Y = torch.ones(NUM_EXAMPLES).long()
dataset = DictDataset(
name="dataset", split=split, X_dict={"data": X}, Y_dict={task_name: Y}
)
dataloader = DictDataLoader(dataset, batch_size=BATCH_SIZE)
return dataloader
import random
import unittest
import numpy as np
import torch
from snorkel.classification import DictDataLoader, DictDataset
from snorkel.classification.training.schedulers import (
SequentialScheduler,
ShuffledScheduler,
)
dataset1 = DictDataset(
"d1",
"train",
X_dict={"data": [0, 1, 2, 3, 4]},
Y_dict={"labels": torch.LongTensor([1, 1, 1, 1, 1])},
)
dataset2 = DictDataset(
"d2",
"train",
X_dict={"data": [5, 6, 7, 8, 9]},
Y_dict={"labels": torch.LongTensor([2, 2, 2, 2, 2])},
)
dataloader1 = DictDataLoader(dataset1, batch_size=2)
dataloader2 = DictDataLoader(dataset2, batch_size=2)
dataloaders = [dataloader1, dataloader2]
y1 = torch.Tensor([0, 0, 0, 0, 0])
dataset = DictDataset(
X_dict={"data1": x1}, Y_dict={"task1": y1}, name="new_data", split="train"
)
# Check if the dataset is correctly constructed
self.assertTrue(torch.equal(dataset[0][0]["data1"], x1[0]))
self.assertTrue(torch.equal(dataset[0][1]["task1"], y1[0]))
self.assertEqual(
repr(dataset),
"DictDataset(name=new_data, X_keys=['data1'], Y_keys=['task1'])",
)
# Test from_tensors inits with default values
dataset = DictDataset.from_tensors(x1, y1, "train")
self.assertEqual(
repr(dataset),
f"DictDataset(name={DEFAULT_DATASET_NAME}, "
f"X_keys=['{DEFAULT_INPUT_DATA_KEY}'], Y_keys=['{DEFAULT_TASK_NAME}'])",
def test_classifier_dataset(self):
"""Unit test of DictDataset"""
x1 = [
torch.Tensor([1]),
torch.Tensor([1, 2]),
torch.Tensor([1, 2, 3]),
torch.Tensor([1, 2, 3, 4]),
torch.Tensor([1, 2, 3, 4, 5]),
]
y1 = torch.Tensor([0, 0, 0, 0, 0])
dataset = DictDataset(
X_dict={"data1": x1}, Y_dict={"task1": y1}, name="new_data", split="train"
)
# Check if the dataset is correctly constructed
self.assertTrue(torch.equal(dataset[0][0]["data1"], x1[0]))
self.assertTrue(torch.equal(dataset[0][1]["task1"], y1[0]))
self.assertEqual(
repr(dataset),
"DictDataset(name=new_data, X_keys=['data1'], Y_keys=['task1'])",
)
# Test from_tensors inits with default values
dataset = DictDataset.from_tensors(x1, y1, "train")
self.assertEqual(
repr(dataset),
f"DictDataset(name={DEFAULT_DATASET_NAME}, "
# Test correct construction
dataloader = self.slice_model.make_slice_dataloader(
dataset=self.datasets[0], S=self.S
)
Y_dict = dataloader.dataset.Y_dict
self.assertEqual(len(Y_dict), 7)
self.assertIn("test_task", Y_dict)
self.assertIn("test_task_slice:base_pred", Y_dict)
self.assertIn("test_task_slice:base_ind", Y_dict)
self.assertIn("test_task_slice:f_pred", Y_dict)
self.assertIn("test_task_slice:f_ind", Y_dict)
self.assertIn("test_task_slice:g_pred", Y_dict)
self.assertIn("test_task_slice:g_ind", Y_dict)
# Test bad data input
bad_data_dataset = DictDataset(
name="test_data",
split="train",
X_dict={self.data_name: self.X},
Y_dict={"bad_labels": self.Y},
)
with self.assertRaisesRegex(ValueError, "labels missing"):
self.slice_model.make_slice_dataloader(dataset=bad_data_dataset, S=self.S)
torch.Tensor([1, 2, 3, 4, 5]),
]
y1 = torch.Tensor([0, 0, 0, 0, 0])
x2 = [
torch.Tensor([1, 2, 3, 4, 5]),
torch.Tensor([1, 2, 3, 4]),
torch.Tensor([1, 2, 3]),
torch.Tensor([1, 2]),
torch.Tensor([1]),
]
y2 = torch.Tensor([1, 1, 1, 1, 1])
dataset = DictDataset(
name="new_data",
split="train",
X_dict={"data1": x1, "data2": x2},
Y_dict={"task1": y1, "task2": y2},
)
dataloader1 = DictDataLoader(dataset=dataset, batch_size=2)
x_batch, y_batch = next(iter(dataloader1))
# Check if the dataloader is correctly constructed
self.assertEqual(dataloader1.dataset.split, "train")
self.assertTrue(torch.equal(x_batch["data1"], torch.Tensor([[1, 0], [1, 2]])))
self.assertTrue(
torch.equal(
x_batch["data2"], torch.Tensor([[1, 2, 3, 4, 5], [1, 2, 3, 4, 0]])
def test_remapped_labels(self):
# Test additional label keys in the Y_dict
# Without remapping, model should ignore them
task_name = self.task1.name
X = torch.FloatTensor([[i, i] for i in range(NUM_EXAMPLES)])
Y = torch.ones(NUM_EXAMPLES).long()
Y_dict = {task_name: Y, "other_task": Y}
dataset = DictDataset(
name="dataset", split="train", X_dict={"data": X}, Y_dict=Y_dict
)
dataloader = DictDataLoader(dataset, batch_size=BATCH_SIZE)
model = MultitaskClassifier([self.task1])
loss_dict, count_dict = model.calculate_loss(dataset.X_dict, dataset.Y_dict)
self.assertIn("task1", loss_dict)
# Test setting without remapping
results = model.predict(dataloader)
self.assertIn("task1", results["golds"])
self.assertNotIn("other_task", results["golds"])
scores = model.score([dataloader])
self.assertIn("task1/dataset/train/accuracy", scores)
self.assertNotIn("other_task/dataset/train/accuracy", scores)
import numpy as np
import torch
from snorkel.classification import DictDataLoader, DictDataset
from snorkel.classification.training.schedulers import (
SequentialScheduler,
ShuffledScheduler,
)
dataset1 = DictDataset(
"d1",
"train",
X_dict={"data": [0, 1, 2, 3, 4]},
Y_dict={"labels": torch.LongTensor([1, 1, 1, 1, 1])},
)
dataset2 = DictDataset(
"d2",
"train",
X_dict={"data": [5, 6, 7, 8, 9]},
Y_dict={"labels": torch.LongTensor([2, 2, 2, 2, 2])},
)
dataloader1 = DictDataLoader(dataset1, batch_size=2)
dataloader2 = DictDataLoader(dataset2, batch_size=2)
dataloaders = [dataloader1, dataloader2]
class SequentialTest(unittest.TestCase):
def test_sequential(self):
scheduler = SequentialScheduler()
data = []
for (batch, dl) in scheduler.get_batches(dataloaders):
def create_dataloader(df: pd.DataFrame, split: str) -> DictDataLoader:
dataset = DictDataset(
name="TestData",
split=split,
X_dict={
"coordinates": torch.stack(
(torch.tensor(df["x1"]), torch.tensor(df["x2"])), dim=1
)
},
Y_dict={"task": torch.tensor(df["y"], dtype=torch.long)},
)
dataloader = DictDataLoader(
dataset=dataset, batch_size=4, shuffle=(dataset.split == "train")
)
return dataloader
# %% [markdown]
# With our data now loaded/created, we can now package it up into `DictDataset`s for training. This object is a simple wrapper around `torch.utils.data.Dataset` and stores data fields and labels as dictionaries.
#
# In the `DictDataset`, each label corresponds to a particular `Task` by name. We'll define these `Task` objects in the following section as we define our model.
#
# `DictDataloader` is a wrapper for `torch.utils.data.Dataloader`, which handles the collate function for `DictDataset` appropriately.
# %%
import torch
from snorkel.classification import DictDataset, DictDataLoader
dataloaders = []
for split in ["train", "valid", "test"]:
X_dict = {"circle_data": torch.FloatTensor(circle_data_splits[split])}
Y_dict = {"circle_task": torch.LongTensor(circle_label_splits[split])}
dataset = DictDataset("CircleDataset", split, X_dict, Y_dict)
dataloader = DictDataLoader(dataset, batch_size=32)
dataloaders.append(dataloader)
for split in ["train", "valid", "test"]:
X_dict = {"square_data": torch.FloatTensor(square_data_splits[split])}
Y_dict = {"square_task": torch.LongTensor(square_label_splits[split])}
dataset = DictDataset("SquareDataset", split, X_dict, Y_dict)
dataloader = DictDataLoader(dataset, batch_size=32)
dataloaders.append(dataloader)
# %% [markdown]
# We now have 6 data loaders, one for each task (`circle_task` and `square_task`) for each split (`train`, `valid`, `test`).
# %% [markdown]
# ## Define Model