Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from tensorboardX import SummaryWriter as XSummaryWriter
except ImportError:
XSummaryWriter = None
from poutyne.framework import Model
from poutyne.framework.callbacks import CSVLogger, Callback, TensorBoardLogger
def some_data_generator(batch_size):
while True:
x = torch.rand(batch_size, 1)
y = torch.rand(batch_size, 1)
yield x, y
class History(Callback):
def on_epoch_end(self, epoch_number, logs):
self.history.append(logs)
def on_batch_end(self, batch_number, logs):
self.history.append(logs)
def on_train_begin(self, logs):
self.history = []
class CSVLoggerTest(TestCase):
batch_size = 20
lr = 1e-3
def setUp(self):
torch.manual_seed(42)
test_generator, return_pred=True
)
With batch metrics, ``return_pred`` and ``return_ground_truth`` flags:
.. code-block:: python
model = Model(pytorch_module, optimizer, loss_function,
batch_metrics=[my_metric1_fn, my_metric2_fn])
loss, (my_metric1, my_metric2), pred_y, true_y = model.evaluate_generator(
test_generator, return_pred=True, return_ground_truth=True
)
"""
if steps is None:
steps = len(generator)
step_iterator = StepIterator(generator, steps, Callback(), self.batch_metrics_names)
loss, batch_metrics, pred_y, true_y = self._validate(step_iterator,
return_pred=return_pred,
return_ground_truth=return_ground_truth)
epoch_metrics = self._get_epoch_metrics()
metrics = np.concatenate((batch_metrics, epoch_metrics))
return self._format_return(loss, metrics, pred_y, return_pred, true_y, return_ground_truth)
def _fit_batch_n_batches_per_step(self,
x,
y,
batches_per_step,
examples_in_step,
*,
callback=Callback(),
step=None,
return_pred=False):
# pylint: disable=too-many-locals
zero_all_gradients = ((step.number - 1) % batches_per_step == 0)
do_backprop = (step.number % batches_per_step == 0)
if zero_all_gradients:
self.optimizer.zero_grad()
loss_tensor, metrics, pred_y = self._compute_loss_and_metrics(x,
y,
return_loss_tensor=True,
return_pred=return_pred)
adjusted_loss_tensor = loss_tensor * step.size
adjusted_loss_tensor.backward()
def _fit_batch(self, x, y, *, callback=Callback(), step=None, return_pred=False):
self.optimizer.zero_grad()
loss_tensor, metrics, pred_y = self._compute_loss_and_metrics(x,
y,
return_loss_tensor=True,
return_pred=return_pred)
loss_tensor.backward()
callback.on_backward_end(step)
self.optimizer.step()
loss = float(loss_tensor)
return loss, metrics, pred_y
def __iter__(self):
self.callback.on_train_begin({})
for epoch in range(self.initial_epoch, self.epochs + 1):
self.callback.on_epoch_begin(epoch, {})
epoch_begin_time = timeit.default_timer()
train_step_iterator = StepIterator(self.train_generator, self.steps_per_epoch, self.callback,
self.batch_metrics_names)
valid_step_iterator = None
if self.valid_generator is not None:
valid_step_iterator = StepIterator(self.valid_generator, self.validation_steps, Callback(),
self.batch_metrics_names)
yield train_step_iterator, valid_step_iterator
val_dict = {}
if valid_step_iterator is not None:
val_metrics_dict = {
'val_' + metric_name: metric
for metric_name, metric in zip(self.batch_metrics_names, valid_step_iterator.metrics)
}
val_metrics_dict.update({
'val_' + metric_name: metric
for metric_name, metric in zip(self.epoch_metrics_names, valid_step_iterator.epoch_metrics)
})
val_dict = {'val_loss': valid_step_iterator.loss, **val_metrics_dict}