Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
count += 1
# Threshold for floating point equality depends on number of
# ranks, since we're comparing against precise multiplication.
if size <= 3 or dtype in ['int32', 'int64']:
threshold = 0
elif size < 10:
threshold = 1e-4
elif size < 15:
threshold = 5e-4
else:
break
if max_difference > threshold:
print("self", count, dtype, dim, max_difference, threshold)
print("tensor", bps.rank(), tensor)
print("multiplied", bps.rank(), multiplied)
assert max_difference <= threshold, 'bps.byteps_push_pull produces \
incorrect results for self'
print('test_byteps_push_pull_inplace passed')
def test_byteps_broadcast(self):
"""Test that the broadcast correctly broadcasts 1D, 2D, 3D tensors."""
rank = bps.rank()
size = bps.size()
# This test does not apply if there is only one worker.
if size == 1:
return
dtypes = ['int32', 'int64',
'float32', 'float64']
dims = [1, 2, 3]
ctx = self._current_context()
count = 300
shapes = [(), (17), (17, 17), (17, 17, 17)]
root_ranks = list(range(size))
for dtype, dim, root_rank in itertools.product(dtypes, dims,
root_ranks):
tensor = mx.nd.ones(shapes[dim], ctx=ctx) * rank
shapes = [(), (17), (17, 17), (17, 17, 17)]
root_ranks = list(range(size))
for dtype, dim, root_rank in itertools.product(dtypes, dims,
root_ranks):
tensor = mx.nd.ones(shapes[dim], ctx=ctx) * rank
root_tensor = mx.nd.ones(shapes[dim], ctx=ctx) * root_rank
tensor = tensor.astype(dtype)
root_tensor = root_tensor.astype(dtype)
broadcast_tensor = bps.broadcast(tensor, root_rank=root_rank,
name=str(count))
if rank != root_rank:
if same(tensor.asnumpy(), root_tensor.asnumpy()):
print("broadcast", count, dtype, dim,
mx.nd.max(tensor == root_tensor))
print("tensor", bps.rank(), tensor)
print("root_tensor", bps.rank(), root_tensor)
print("comparison", bps.rank(), tensor == root_tensor)
assert not same(tensor.asnumpy(), root_tensor.asnumpy()), \
'bps.broadcast modifies source tensor'
if not same(broadcast_tensor.asnumpy(), root_tensor.asnumpy()):
print("broadcast", count, dtype, dim)
print("broadcast_tensor", bps.rank(), broadcast_tensor)
print("root_tensor", bps.rank(), root_tensor)
print("comparison", bps.rank(),
broadcast_tensor == root_tensor)
assert same(broadcast_tensor.asnumpy(), root_tensor.asnumpy()), \
'bps.broadcast produces incorrect broadcasted tensor'
def test_byteps_push_pull(self):
"""Test that the byteps_push_pull correctly sums 1D, 2D, 3D tensors."""
size = bps.size()
dtypes = self.filter_supported_types(['float32'])
dims = [1]
ctx = self._current_context()
count = 100
shapes = [(), (17)]
for dtype, dim in itertools.product(dtypes, dims):
# MXNet uses gpu_id as part of the seed, so to get identical seeds
# we must set a context.
mx.random.seed(10 + 10 * bps.rank(), ctx=ctx)
tensor = mx.nd.random.uniform(-100, 100, shape=shapes[dim],
ctx=ctx)
tensor = tensor.astype(dtype)
print("tensor before push_pull:", tensor)
bps.byteps_declare_tensor("tensor_" + str(count))
bps.byteps_push_pull(tensor, name="tensor_"+str(count))
tensor.wait_to_read()
print("tensor after push_pull:", tensor)
print('test_byteps_push_pull passed')
def fit(args, network, data_loader, **kwargs):
"""
train a model
args : argparse returns
network : the symbol definition of the nerual network
data_loader : function that returns the train and val data iterators
"""
# kvstore
# kv = mx.kvstore.create(args.kv_store)
# if args.gc_type != 'none':
# kv.set_gradient_compression({'type': args.gc_type,
# 'threshold': args.gc_threshold})
# logging
head = '%(asctime)-15s Node[' + str(bps.rank()) + '] %(message)s'
logging.basicConfig(level=logging.DEBUG, format=head)
logging.info('start with arguments %s', args)
# data iterators
(train, val) = data_loader(args, (bps.rank(), bps.size()))
if args.test_io:
tic = time.time()
for i, batch in enumerate(train):
for j in batch.data:
j.wait_to_read()
if (i + 1) % args.disp_batches == 0:
logging.info('Batch [%d]\tSpeed: %.2f samples/sec', i,
args.disp_batches * args.batch_size / (time.time() - tic))
tic = time.time()
return
for i, batch in enumerate(train):
for j in batch.data:
j.wait_to_read()
if (i + 1) % args.disp_batches == 0:
logging.info('Batch [%d]\tSpeed: %.2f samples/sec', i,
args.disp_batches * args.batch_size / (time.time() - tic))
tic = time.time()
return
# load model
if 'arg_params' in kwargs and 'aux_params' in kwargs:
arg_params = kwargs['arg_params']
aux_params = kwargs['aux_params']
else:
sym, arg_params, aux_params = _load_model(args, bps.rank())
if sym is not None:
assert sym.tojson() == network.tojson()
# save model
checkpoint = _save_model(args, bps.rank())
# devices for training
if args.cpu_train:
devs = [mx.cpu(bps.local_rank())]
else:
logging.info('Launch BytePS process on GPU-%d', bps.local_rank())
devs = [mx.gpu(bps.local_rank())]
# learning rate
lr, lr_scheduler = _get_lr_scheduler(args)
network : the symbol definition of the nerual network
data_loader : function that returns the train and val data iterators
"""
# kvstore
# kv = mx.kvstore.create(args.kv_store)
# if args.gc_type != 'none':
# kv.set_gradient_compression({'type': args.gc_type,
# 'threshold': args.gc_threshold})
# logging
head = '%(asctime)-15s Node[' + str(bps.rank()) + '] %(message)s'
logging.basicConfig(level=logging.DEBUG, format=head)
logging.info('start with arguments %s', args)
# data iterators
(train, val) = data_loader(args, (bps.rank(), bps.size()))
if args.test_io:
tic = time.time()
for i, batch in enumerate(train):
for j in batch.data:
j.wait_to_read()
if (i + 1) % args.disp_batches == 0:
logging.info('Batch [%d]\tSpeed: %.2f samples/sec', i,
args.disp_batches * args.batch_size / (time.time() - tic))
tic = time.time()
return
# load model
if 'arg_params' in kwargs and 'aux_params' in kwargs:
arg_params = kwargs['arg_params']
aux_params = kwargs['aux_params']
(epoch, i, name, acc))
if bps.rank() == 0:
elapsed = time.time() - tic
speed = train_size * num_workers / elapsed
logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f',
epoch, speed, elapsed)
# Evaluate model accuracy
_, train_acc = metric.get()
name, val_acc = evaluate(model, val_data, context)
if bps.rank() == 0:
logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name,
train_acc, name, val_acc)
if bps.rank() == 0 and epoch == args.epochs - 1:
assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\
(0.96)" % val_acc
else:
eval_metrics.append(mx.metric.create(loss_type))
else:
logging.warning("The output is not softmax_output, loss argument will be skipped!")
# callbacks that run after each batch
batch_end_callbacks = [mx.callback.Speedometer(
args.batch_size, args.disp_batches)]
if 'batch_end_callback' in kwargs:
cbs = kwargs['batch_end_callback']
batch_end_callbacks += cbs if isinstance(cbs, list) else [cbs]
# BytePS wrapper
opt = mx.optimizer.create(args.optimizer, sym=network, **optimizer_params)
# opt = bps.DistributedOptimizer(opt)
print(str(os.environ) + "=============" + str(bps.rank()))
# else:
opt = bps.DistributedOptimizer(opt)
# BytePS: better to explicitly init
model.bind(data_shapes=train.provide_data,
label_shapes=train.provide_label)
if arg_params is None and aux_params is None:
model.init_params(initializer)
(arg_params, aux_params) = model.get_params()
if arg_params is not None:
bps.broadcast_parameters(arg_params, root_rank=0)
if aux_params is not None:
bps.broadcast_parameters(aux_params, root_rank=0)
model.set_params(arg_params=arg_params, aux_params=aux_params)