Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_rec_iter(args, kv=None, dali_cpu=False):
gpus = args.gpus
num_threads = args.dali_threads
num_validation_threads = args.dali_validation_threads
pad_output = (args.image_shape[0] == 4)
# the input_layout w.r.t. the model is the output_layout of the image pipeline
output_layout = types.NHWC if args.input_layout == 'NHWC' else types.NCHW
if 'horovod' in args.kv_store:
rank = hvd.rank()
nWrk = hvd.size()
else:
rank = kv.rank if kv else 0
nWrk = kv.num_workers if kv else 1
batch_size = args.batch_size // nWrk // len(gpus)
trainpipes = [HybridTrainPipe(args = args,
batch_size = batch_size,
num_threads = num_threads,
device_id = gpu_id,
rec_path = args.data_train,
idx_path = args.data_train_idx,
shard_id = gpus.index(gpu_id) + len(gpus)*rank,
num_shards = len(gpus)*nWrk,
crop_shape = args.image_shape[1:],
output_layout = output_layout,
def test_horovod_broadcast_type_error(self):
"""Test that the broadcast returns an error if the types being broadcasted
differ among the processes"""
hvd.init()
rank = hvd.rank()
size = hvd.size()
# This test does not apply if there is only one worker.
if size == 1:
return
ctx = self._current_context()
shape = (17, 3)
tensor = mx.nd.ones(shape=shape, ctx=ctx)
if rank % 2 == 0:
tensor = tensor.astype('int32')
else:
tensor = tensor.astype('float32')
try:
output = hvd.broadcast(tensor, 0)
output.wait_to_read()
def test_horovod_broadcast_error(self):
"""Test that the broadcast returns an error if any dimension besides
the first is different among the tensors being broadcasted."""
hvd.init()
rank = hvd.rank()
size = hvd.size()
# This test does not apply if there is only one worker.
if size == 1:
return
ctx = self._current_context()
shape = (17, rank+1)
tensor = mx.nd.ones(shape=shape, ctx=ctx)
try:
output = hvd.broadcast(tensor, 0)
output.wait_to_read()
assert False, 'hvd.broadcast did not throw error'
except (MXNetError, RuntimeError):
pass
def test_horovod_broadcast_rank_error(self):
"""Test that the broadcast returns an error if different ranks
specify different root rank."""
hvd.init()
rank = hvd.rank()
size = hvd.size()
# This test does not apply if there is only one worker.
if size == 1:
return
ctx = self._current_context()
shape = (17, 17, 17)
tensor = mx.nd.ones(shape=shape, ctx=ctx)
try:
output = hvd.broadcast(tensor, root_rank=rank)
output.wait_to_read()
assert False, 'hvd.broadcast did not throw rank error'
except (MXNetError, RuntimeError):
pass
def test_horovod_allreduce(self):
"""Test that the allreduce correctly sums 1D, 2D, 3D tensors."""
hvd.init()
size = hvd.size()
dtypes = self.filter_supported_types(['int32', 'int64',
'float32', 'float64'])
dims = [1, 2, 3]
ctx = self._current_context()
count = 0
shapes = [(), (17), (17, 17), (17, 17, 17)]
for dtype, dim in itertools.product(dtypes, dims):
# MXNet uses gpu_id as part of the seed, so to get identical seeds
# we must set a context.
mx.random.seed(1234, ctx=ctx)
tensor = mx.nd.random.uniform(-100, 100, shape=shapes[dim],
ctx=ctx)
tensor = tensor.astype(dtype)
summed = hvd.allreduce(tensor, average=False, name=str(count))
multiplied = tensor * size
max_difference = mx.nd.max(mx.nd.subtract(summed, multiplied))
output = model(data.astype(args.dtype, copy=False))
metric.update([label], [output])
return metric.get()
# Initialize Horovod
hvd.init()
# Polyaxon
if hvd.rank() == 0:
experiment = Run()
# Horovod: pin context to local rank
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank())
num_workers = hvd.size()
# Load training and validation data
train_data, val_data = get_mnist_iterator(hvd.rank())
# Build model
model = conv_nets()
model.cast(args.dtype)
model.hybridize()
# Define hyper parameters
optimizer_params = {'momentum': args.momentum,
'learning_rate': args.lr * hvd.size(),
'rescale_grad': 1.0 / args.batch_size}
# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
def init_comm(backend, gpus):
"""Init communication backend"""
# backend specific implementation
if backend == 'horovod':
try:
import horovod.mxnet as hvd # pylint: disable=import-outside-toplevel
except ImportError:
logging.info('horovod must be installed.')
sys.exit(1)
hvd.init()
store = None
num_workers = hvd.size()
rank = hvd.rank()
local_rank = hvd.local_rank()
is_master_node = rank == local_rank
ctx_l = [mx.gpu(local_rank)]
logging.info('GPU communication supported by horovod')
else:
store = mx.kv.create(backend)
num_workers = store.num_workers
rank = store.rank
local_rank = 0
is_master_node = rank == local_rank
if gpus == '-1' or gpus == '':
ctx_l = [mx.cpu()]
logging.info('Runing on CPU')
else:
ctx_l = [mx.gpu(int(x)) for x in gpus.split(',')]
label = batch.label[0].as_in_context(context)
with autograd.record():
output = model(data.astype(args.dtype, copy=False))
loss = loss_fn(output, label)
loss.backward()
trainer.step(args.batch_size)
metric.update([label], [output])
if nbatch % 100 == 0:
name, acc = metric.get()
logging.info('[Epoch %d Batch %d] Training: %s=%f' %
(epoch, nbatch, name, acc))
if hvd.rank() == 0:
elapsed = time.time() - tic
speed = nbatch * args.batch_size * hvd.size() / elapsed
logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f',
epoch, speed, elapsed)
# Evaluate model accuracy
_, train_acc = metric.get()
name, val_acc = evaluate(model, val_data, context)
if hvd.rank() == 0:
logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name,
train_acc, name, val_acc)
# Polyaxon
experiment.log_metrics(step=epoch, train_acc=train_acc, val_acc=val_acc)
if hvd.rank() == 0 and epoch == args.epochs - 1:
# Polyaxon
experiment.log_metrics(val_acc=val_acc)
def _get_num_workers(self):
try:
import horovod.mxnet as hvd
if hvd.size():
return hvd.size()
except (ModuleNotFoundError, ValueError, ImportError):
pass
return 1
zip_file_path = download('http://data.mxnet.io/mxnet/data/mnist.zip',
dirname=data_dir)
with zipfile.ZipFile(zip_file_path) as zf:
zf.extractall(data_dir)
input_shape = (1, 28, 28)
batch_size = args.batch_size
train_iter = mx.io.MNISTIter(
image="%s/train-images-idx3-ubyte" % data_dir,
label="%s/train-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
shuffle=True,
flat=False,
num_parts=hvd.size(),
part_index=hvd.rank()
)
val_iter = mx.io.MNISTIter(
image="%s/t10k-images-idx3-ubyte" % data_dir,
label="%s/t10k-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
flat=False,
)
return train_iter, val_iter