Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
n_chans,
n_classes,
input_time_length=input_time_length,
final_conv_length="auto",
).create_network()
if cuda:
model.cuda()
log.info("Model: \n{:s}".format(str(model)))
optimizer = optim.Adam(model.parameters())
iterator = BalancedBatchSizeIterator(batch_size=batch_size)
stop_criterion = Or(
[
MaxEpochs(max_epochs),
NoDecrease("valid_misclass", max_increase_epochs),
]
)
monitors = [LossMonitor(), MisclassMonitor(), RuntimeMonitor()]
model_constraint = MaxNormDefaultConstraint()
exp = Experiment(
model,
train_set,
valid_set,
test_set,
iterator=iterator,
loss_function=F.nll_loss,
optimizer=optimizer,
Parameters
----------
X: ndarray
Input data.
y: 1darray
Targets.
Returns
-------
result: dict
Dictionary with result metrics.
"""
X = _ensure_float32(X)
stop_criterion = MaxEpochs(0)
train_set = SignalAndTarget(X, y)
model_constraint = None
valid_set = None
test_set = None
loss_function = self.loss
if self.cropped:
loss_function = lambda outputs, targets: self.loss(
th.mean(outputs, dim=2), targets
)
# reset runtime monitor if exists...
for monitor in self.monitors:
if hasattr(monitor, "last_call_time"):
monitor.last_call_time = time.time()
exp = Experiment(
self.network,
n_preds_per_input = out.cpu().data.numpy().shape[2]
self.iterator = CropsFromTrialsIterator(
batch_size=batch_size,
input_time_length=input_time_length,
n_preds_per_input=n_preds_per_input,
seed=self.seed_rng.randint(0, np.iinfo(np.int32).max - 1),
)
else:
self.iterator = BalancedBatchSizeIterator(
batch_size=batch_size,
seed=self.seed_rng.randint(0, np.iinfo(np.int32).max - 1),
)
if log_0_epoch:
stop_criterion = MaxEpochs(epochs)
else:
stop_criterion = MaxEpochs(epochs - 1)
train_set = SignalAndTarget(train_X, train_y)
optimizer = self.optimizer
if scheduler is not None:
assert (
scheduler == "cosine"
), "Supply either 'cosine' or None as scheduler."
n_updates_per_epoch = sum(
[1 for _ in self.iterator.get_batches(train_set, shuffle=True)]
)
n_updates_per_period = n_updates_per_epoch * epochs
if scheduler == "cosine":
scheduler = CosineAnnealing(n_updates_per_period)
schedule_weight_decay = False
if optimizer.__class__.__name__ == "AdamW":
schedule_weight_decay = True
optimizer = ScheduledOptimizer(
def setup_after_stop_training(self):
"""
Setup training after first stop.
Resets parameters to best parameters and updates stop criterion.
"""
# also remember old monitor chans, will be put back into
# monitor chans after experiment finished
self.before_stop_df = deepcopy(self.epochs_df)
self.rememberer.reset_to_best_model(
self.epochs_df, self.model, self.optimizer
)
loss_to_reach = float(self.epochs_df["train_loss"].iloc[-1])
self.stop_criterion = Or(
stop_criteria=[
MaxEpochs(max_epochs=self.rememberer.best_epoch * 2),
ColumnBelow(
column_name="valid_loss", target_value=loss_to_reach
),
]
)
log.info("Train loss to reach {:.5f}".format(loss_to_reach))
test_input = test_input.cuda()
out = self.network(test_input)
n_preds_per_input = out.cpu().data.numpy().shape[2]
self.iterator = CropsFromTrialsIterator(
batch_size=batch_size,
input_time_length=input_time_length,
n_preds_per_input=n_preds_per_input,
seed=self.seed_rng.randint(0, np.iinfo(np.int32).max - 1),
)
else:
self.iterator = BalancedBatchSizeIterator(
batch_size=batch_size,
seed=self.seed_rng.randint(0, np.iinfo(np.int32).max - 1),
)
if log_0_epoch:
stop_criterion = MaxEpochs(epochs)
else:
stop_criterion = MaxEpochs(epochs - 1)
train_set = SignalAndTarget(train_X, train_y)
optimizer = self.optimizer
if scheduler is not None:
assert (
scheduler == "cosine"
), "Supply either 'cosine' or None as scheduler."
n_updates_per_epoch = sum(
[1 for _ in self.iterator.get_batches(train_set, shuffle=True)]
)
n_updates_per_period = n_updates_per_epoch * epochs
if scheduler == "cosine":
scheduler = CosineAnnealing(n_updates_per_period)
schedule_weight_decay = False
if optimizer.__class__.__name__ == "AdamW":