Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Parameters
----------
inputs: `torch.autograd.Variable`
targets: `torch.autograd.Variable`
Returns
-------
predictions: `torch.autograd.Variable`
loss: `torch.autograd.Variable`
"""
self.model.eval()
with th.no_grad():
input_vars = np_to_var(inputs, pin_memory=self.pin_memory)
target_vars = np_to_var(targets, pin_memory=self.pin_memory)
if self.cuda:
input_vars = input_vars.cuda()
target_vars = target_vars.cuda()
outputs = self.model(input_vars)
loss = self.loss_function(outputs, target_vars)
if hasattr(outputs, "cpu"):
outputs = outputs.cpu().detach().numpy()
else:
# assume it is iterable
outputs = [o.cpu().detach().numpy() for o in outputs]
loss = loss.cpu().detach().numpy()
return outputs, loss
add_conv_pool_block(
model, n_filters_conv, self.n_filters_2, self.filter_length_2, 2
)
add_conv_pool_block(
model, self.n_filters_2, self.n_filters_3, self.filter_length_3, 3
)
add_conv_pool_block(
model, self.n_filters_3, self.n_filters_4, self.filter_length_4, 4
)
# model.add_module('drop_classifier', nn.Dropout(p=self.drop_prob))
model.eval()
if self.final_conv_length == "auto":
out = model(
np_to_var(
np.ones(
(1, self.in_chans, self.input_time_length, 1),
dtype=np.float32,
)
)
)
n_out_time = out.cpu().data.numpy().shape[2]
self.final_conv_length = n_out_time
model.add_module(
"conv_classifier",
nn.Conv2d(
self.n_filters_4,
self.n_classes,
(self.final_conv_length, 1),
bias=True,
),
Evaluate given inputs and targets.
Parameters
----------
inputs: `torch.autograd.Variable`
targets: `torch.autograd.Variable`
Returns
-------
predictions: `torch.autograd.Variable`
loss: `torch.autograd.Variable`
"""
self.model.eval()
with th.no_grad():
input_vars = np_to_var(inputs, pin_memory=self.pin_memory)
target_vars = np_to_var(targets, pin_memory=self.pin_memory)
if self.cuda:
input_vars = input_vars.cuda()
target_vars = target_vars.cuda()
outputs = self.model(input_vars)
loss = self.loss_function(outputs, target_vars)
if hasattr(outputs, "cpu"):
outputs = outputs.cpu().detach().numpy()
else:
# assume it is iterable
outputs = [o.cpu().detach().numpy() for o in outputs]
loss = loss.cpu().detach().numpy()
return outputs, loss
if (not hasattr(self, "compiled")) or (not self.compiled):
raise ValueError(
"Compile the model first by calling model.compile(loss, optimizer, metrics)"
)
if self.cropped and input_time_length is None:
raise ValueError(
"In cropped mode, need to specify input_time_length,"
"which is the number of timesteps that will be pushed through"
"the network in a single pass."
)
train_X = _ensure_float32(train_X)
if self.cropped:
self.network.eval()
test_input = np_to_var(
np.ones(
(1, train_X[0].shape[0], input_time_length)
+ train_X[0].shape[2:],
dtype=np.float32,
)
)
while len(test_input.size()) < 4:
test_input = test_input.unsqueeze(-1)
if self.cuda:
test_input = test_input.cuda()
out = self.network(test_input)
n_preds_per_input = out.cpu().data.numpy().shape[2]
self.iterator = CropsFromTrialsIterator(
batch_size=batch_size,
input_time_length=input_time_length,
n_preds_per_input=n_preds_per_input,
),
)
model.add_module("conv_nonlin", Expression(self.conv_nonlin))
model.add_module(
"pool",
pool_class(
kernel_size=(self.pool_time_length, 1),
stride=(self.pool_time_stride, 1),
),
)
model.add_module("pool_nonlin", Expression(self.pool_nonlin))
model.add_module("drop", nn.Dropout(p=self.drop_prob))
model.eval()
if self.final_conv_length == "auto":
out = model(
np_to_var(
np.ones(
(1, self.in_chans, self.input_time_length, 1),
dtype=np.float32,
)
)
)
n_out_time = out.cpu().data.numpy().shape[2]
self.final_conv_length = n_out_time
model.add_module(
"conv_classifier",
nn.Conv2d(
n_filters_conv,
self.n_classes,
(self.final_conv_length, 1),
bias=True,
),
# Create weights for the convolution on demand:
# size or type of x changed...
in_channels = x.size()[1]
weight_shape = (
in_channels,
1,
self.kernel_size[0],
self.kernel_size[1],
)
if self._pool_weights is None or (
(tuple(self._pool_weights.size()) != tuple(weight_shape))
or (self._pool_weights.is_cuda != x.is_cuda)
or (self._pool_weights.data.type() != x.data.type())
):
n_pool = np.prod(self.kernel_size)
weights = np_to_var(
np.ones(weight_shape, dtype=np.float32) / float(n_pool)
)
weights = weights.type_as(x)
if x.is_cuda:
weights = weights.cuda()
self._pool_weights = weights
pooled = F.conv2d(
x,
self._pool_weights,
bias=None,
stride=self.stride,
dilation=self.dilation,
padding=self.padding,
groups=in_channels,
)
Returns
-------
outs_per_trial: 2darray or list of 2darrays
Network outputs for each trial, optionally for each crop within trial.
"""
if individual_crops:
assert self.cropped, "Cropped labels only for cropped decoding"
X = _ensure_float32(X)
all_preds = []
with th.no_grad():
dummy_y = np.ones(len(X), dtype=np.int64)
for b_X, _ in self.iterator.get_batches(
SignalAndTarget(X, dummy_y), False
):
b_X_var = np_to_var(b_X)
if self.cuda:
b_X_var = b_X_var.cuda()
all_preds.append(var_to_np(self.network(b_X_var)))
if self.cropped:
outs_per_trial = compute_preds_per_trial_from_crops(
all_preds, self.iterator.input_time_length, X
)
if not individual_crops:
outs_per_trial = np.array(
[np.mean(o, axis=1) for o in outs_per_trial]
)
else:
outs_per_trial = np.concatenate(all_preds)
return outs_per_trial
n_classes = 4
n_chans = int(train_set.X.shape[1])
if model == 'shallow':
model = ShallowFBCSPNet(n_chans, n_classes, input_time_length=input_time_length,
final_conv_length=30).create_network()
elif model == 'deep':
model = Deep4Net(n_chans, n_classes, input_time_length=input_time_length,
final_conv_length=2).create_network()
to_dense_prediction_model(model)
if cuda:
model.cuda()
log.info("Model: \n{:s}".format(str(model)))
dummy_input = np_to_var(train_set.X[:1, :, :, None])
if cuda:
dummy_input = dummy_input.cuda()
out = model(dummy_input)
n_preds_per_input = out.cpu().data.numpy().shape[2]
optimizer = optim.Adam(model.parameters())
iterator = CropsFromTrialsIterator(batch_size=batch_size,
input_time_length=input_time_length,
n_preds_per_input=n_preds_per_input)
stop_criterion = Or([MaxEpochs(max_epochs),
NoDecrease('valid_misclass', max_increase_epochs)])
monitors = [LossMonitor(), MisclassMonitor(col_suffix='sample_misclass'),