Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_horovod_allreduce_grad(self):
"""Test the correctness of the allreduce gradient."""
hvd.init()
size = hvd.size()
# Only Tensors of floating point dtype can require gradients
dtypes = [torch.FloatTensor, torch.DoubleTensor]
if torch.cuda.is_available():
dtypes += [torch.cuda.FloatTensor, torch.cuda.DoubleTensor]
if _fp16_supported:
dtypes += [torch.cuda.HalfTensor]
dims = [1, 2, 3]
for dtype, dim in itertools.product(dtypes, dims):
torch.manual_seed(1234)
tensor = torch.FloatTensor(*([17] * dim)).random_(-100, 100)
tensor = self.cast_and_place(tensor, dtype)
tensor.requires_grad_()
summed = hvd.allreduce(tensor, average=False)
summed.backward(self.cast_and_place(torch.ones([17] * dim), dtype))
grad_out = tensor.grad.data.cpu().numpy()
def test_horovod_join_allreduce(self):
"""Test Join op with allreduce."""
# "Join Op is not supported for PyTorch < 1.0"
if not _v2_api:
return
hvd.init()
rank = hvd.rank()
size = hvd.size()
dtypes = self.filter_supported_types([torch.IntTensor, torch.LongTensor,
torch.FloatTensor, torch.DoubleTensor])
if torch.cuda.is_available():
dtypes += [torch.cuda.IntTensor, torch.cuda.LongTensor,
torch.cuda.FloatTensor, torch.cuda.DoubleTensor]
if _fp16_supported:
dtypes += [torch.cuda.HalfTensor]
dims = [1, 2, 3]
first_join_ranks = [0, 1]
cachings = [False, True]
for dtype, dim, first_join_rank, caching in itertools.product(dtypes, dims, first_join_ranks, cachings):
torch.manual_seed(1234)
# Use two tensors to test fusion
tensor_a = torch.FloatTensor(*([5] * dim)).random_(-100, 100)
# Benchmark
log('Running benchmark...')
img_secs = []
for x in range(args.num_iters):
time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter)
img_sec = args.batch_size * args.num_batches_per_iter / time
log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
img_secs.append(img_sec)
# Results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf))
log('Total img/sec on %d %s(s): %.1f +-%.1f' %
(hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))
def _log_summary(data_length, duration):
logger = _get_logger()
images_per_second = data_length / duration
logger.info('Data length: {}'.format(data_length))
logger.info('Total duration: {:.3f}'.format(duration))
logger.info('Total images/sec: {:.3f}'.format(images_per_second))
logger.info('Batch size: (Per GPU {}: Total {})'.format(_BATCHSIZE, hvd.size()*_BATCHSIZE if _DISTRIBUTED else _BATCHSIZE))
logger.info('Distributed: {}'.format('True' if _DISTRIBUTED else 'False'))
logger.info('Num GPUs: {:.3f}'.format(hvd.size() if _DISTRIBUTED else 1))
logger.info('Dataset: {}'.format('Synthetic' if _FAKE else 'Imagenet'))
def _get_sampler(dataset, is_distributed=_DISTRIBUTED):
if is_distributed:
return torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=hvd.size(), rank=hvd.rank()
)
else:
return torch.utils.data.sampler.RandomSampler(dataset)
def device_count(self):
if self._multigpu:
if self.using_horovod():
import horovod.torch as hvd
return hvd.size()
else:
return torch.cuda.device_count()
else:
return 1
]
},
]
num_examples = len(train_dataset)
num_batches = int(num_examples / batch_size)
num_train_optimization_steps = num_batches * num_epochs
if warmup_proportion is None:
optimizer = BertAdam(
optimizer_grouped_parameters, lr=lr * hvd.size()
)
else:
optimizer = BertAdam(
optimizer_grouped_parameters,
lr=lr * hvd.size(),
t_total=num_train_optimization_steps,
warmup=warmup_proportion,
)
# Horovod: (optional) compression algorithm.
compression = (
hvd.Compression.fp16 if fp16_allreduce else hvd.Compression.none
)
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=self.model.named_parameters(),
compression=compression,
)
train_set = datasets.CIFAR10(root="data/cifar", train=True, transform=transform_ops, download=True)
test_set = datasets.CIFAR10(root="data/cifar", train=False, transform=transform_ops, download=True)
# if distributed over multiple GPU's, set-up barrier a barrier ensuring that all the processes have loaded the data
if distributed:
hvd.allreduce_(torch.Tensor(0), name='barrier')
# get dataset for training and testing of the model
if not root_process:
train_set = datasets.CIFAR10(root="data/cifar", train=True, transform=transform_ops, download=True)
test_set = datasets.CIFAR10(root="data/cifar", train=False, transform=transform_ops, download=True)
# setup data sampler
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_set, num_replicas=hvd.size(), rank=hvd.rank())
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_set, num_replicas=hvd.size(), rank=hvd.rank())
# setup mini-batch enumerator for both train-set and test-set
train_loader = torch.utils.data.DataLoader(
dataset=train_set, sampler=train_sampler if distributed else None,
batch_size=batch_size, shuffle=False if distributed else True, drop_last=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
dataset=test_set, sampler=test_sampler if distributed else None,
batch_size=batch_size, shuffle=False if distributed else True, drop_last=True, **kwargs)
# Distributed: broadcast parameters to all the processes
if distributed:
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
def __init__(self, dataset, batch_size, distributed=False, num_workers=0, timeout=1000):
if not distributed:
super(ChunkDataloader, self).__init__(dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
collate_fn=self.collate_fn,
timeout=timeout)
else:
import horovod.torch as hvd
sampler = DistributedSampler(dataset, num_replicas=hvd.size(), rank=hvd.rank())
super(ChunkDataloader, self).__init__(dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=self.collate_fn,
drop_last=False,
timeout=timeout)
elif epoch < 90:
lr_adj = 1e-2
else:
lr_adj = 1e-3
elif type == "cosine":
# self.init_lr * 0.5 * (1 + math.cos(math.pi * T_cur / T_total))
run_epochs = epoch - args.warmup_epochs
total_epochs = args.epochs - args.warmup_epochs
T_cur = float(run_epochs * len(train_loader)) + batch_idx
T_total = float(total_epochs * len(train_loader))
lr_adj = 0.5 * (1 + math.cos(math.pi * T_cur / T_total))
for param_group in optimizer.param_groups:
param_group['lr'] = args.base_lr * hvd.size() * lr_adj
return args.base_lr * hvd.size() * lr_adj