Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
If `cache_dir` and `load_model_from_dir` are the same and
`overwrite_model` is `False`, the fitted model is saved
to "cache_dir/fine_tuned". Defaults to False.
overwrite_model (bool, optional): Whether to overwrite an existing model.
If `cache_dir` and `load_model_from_dir` are the same and
`overwrite_model` is `False`, the fitted model is saved to
"cache_dir/fine_tuned". Defaults to False.
"""
# tb_writer = SummaryWriter()
# device = get_device("cpu" if num_gpus == 0 or not torch.cuda.is_available() else "gpu")
# self.model = move_to_device(self.model, device, num_gpus)
# hvd.init()
rank = hvd.rank()
local_rank = hvd.local_rank()
world_size = hvd.size()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
is_master = rank == 0
self.cache_dir = self.cache_dir + "/distributed_" + str(local_rank)
self.model = self.model.to(device)
# t_total = len(train_dataloader) * num_epochs
# t_total = len(train_dataloader) // gradient_accumulation_steps * num_epochs
max_steps = 48000
# Horovod based
try:
import horovod.torch as hvd
except ImportError:
raise SystemError("horovod is not working, try to set using_horovod=False.")
from nmtlab.trainers.distributed_optim import FlexibleDistributedOptimizer
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())
self._model = model
self._model.cuda()
self._optimizer = FlexibleDistributedOptimizer(self._optimizer, named_parameters=self._model.named_parameters())
hvd.broadcast_parameters(self._model.state_dict(), root_rank=ROOT_RANK)
# Set the scope of training data
self._dataset.set_gpu_scope(hvd.rank(), hvd.size())
elif self._multigpu:
# Pytorch-based multi gpu backend
model.cuda()
self._model = nn.DataParallel(model)
elif torch.cuda.is_available():
# Single-gpu case
self._model = model
self._model.cuda()
else:
self._model = model
to "cache_dir/fine_tuned". Defaults to False.
overwrite_model (bool, optional): Whether to overwrite an existing model.
If `cache_dir` and `load_model_from_dir` are the same and
`overwrite_model` is `False`, the fitted model is saved to
"cache_dir/fine_tuned". Defaults to False.
"""
# dist
step_per_log = 100
is_master = False
if distributed:
hvd.init()
run = Run.get_context()
rank = hvd.rank()
local_rank = hvd.local_rank()
world_size = hvd.size()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
is_master = rank == 0
self.cache_dir = self.cache_dir + "/distributed_" + str(rank)
self.model = self.model.to(device)
else:
# hvd.init()
# local_rank = hvd.local_rank()
# world_size = hvd.size()
try:
logging.info('Actual start global step:', sess.run(global_step), 'learning rate:', sess.run(learning_rate), 'learning_rate_weight:', sess.run(learning_rate_weight))
except Exception:
pass
if use_horovod:
bcast = hvd.broadcast_global_variables(0)
sess.run(bcast)
# before below
if gezi.has_env('METRIC'):
# safe for using horovod
model_path_ = _get_model_path(model_dir_)
l = metric_eval_fn(model_path_)
if not use_horovod or hvd.rank() == 0:
print(list(zip(l[1], l[0])))
exit(0)
if model_dir_:
#if save_interval_epochs and num_steps_per_epoch and num_steps >= 0:
epoch_dir = os.path.join(model_dir_, 'epoch')
if not use_horovod or hvd.rank() == 0:
gezi.try_mkdir(epoch_dir)
checkpoint_path = os.path.join(model_dir_, 'model.ckpt')
# coord = tf.train.Coordinator()
# threads = tf.train.start_queue_runners(sess=sess, coord=coord)
#tf.train.write_graph(sess.graph_def, model_dir, 'train.pbtxt')
only_one_step = False
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
train_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
test_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
def _get_sampler(dataset, is_distributed=_DISTRIBUTED):
if is_distributed:
return torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=hvd.size(), rank=hvd.rank())
else:
return torch.utils.data.sampler.RandomSampler(dataset)
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
test_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
sampler=test_sampler, **kwargs)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
get_optimizer_with_unscaled_lr(hvd, optimizer, optimizer_cls, model)
state = {
'model': model.state_dict(),
'optimizer': optimizer_with_scaled_down_lr.state_dict(),
}
torch.save(state, ckpt_file)
if cuda_available:
model.cuda()
# Petastorm: read data from the store with the correct shard for this rank
# setting num_epochs=None will cause an infinite iterator
# and enables ranks to perform training and validation with
# unequal number of samples
with make_batch_reader(remote_store.train_data_path,
num_epochs=None,
cur_shard=hvd.rank(),
shard_count=hvd.size(),
hdfs_driver=PETASTORM_HDFS_DRIVER,
schema_fields=schema_fields) as train_reader:
with make_batch_reader(remote_store.val_data_path,
num_epochs=None,
cur_shard=hvd.rank(),
shard_count=hvd.size(),
hdfs_driver=PETASTORM_HDFS_DRIVER,
schema_fields=schema_fields) \
if should_validate else empty_batch_reader() as val_reader:
train_loader = DataLoader(train_reader,
batch_size=batch_size,
shuffling_queue_capacity=shuffle_buffer_size)
train_loader_iter = iter(train_loader)
self.test_only = test_only
# now decide on a sampler
#base_sampler = torch.utils.data.SequentialSampler(self.dataset)
base_sampler = torch.utils.data.RandomSampler(dataset)
if not distributed:
sampler = torch.utils.data.BatchSampler(base_sampler, batch_size, False)
super(SeqDataloader, self).__init__(dataset,
batch_sampler=sampler,
num_workers=num_workers,
collate_fn=self.collate_fn)
else:
import horovod.torch as hvd
sampler = DistributedSampler(dataset, num_replicas=hvd.size(), rank=hvd.rank())
super(SeqDataloader, self).__init__(dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=self.collate_fn,
drop_last=False,
timeout=timeout)