Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
hostnames = subprocess.check_output(['scontrol', 'show', 'hostnames', node_list])
args.distributed_init_method = 'tcp://{host}:{port}'.format(
host=hostnames.split()[0].decode('utf-8'),
port=args.distributed_port)
args.distributed_rank = int(os.environ.get('SLURM_PROCID'))
args.device_id = int(os.environ.get('SLURM_LOCALID'))
except subprocess.CalledProcessError as e: # scontrol failed
raise e
except FileNotFoundError as e: # Slurm is not installed
pass
if args.distributed_init_method is None and args.distributed_port is None:
raise ValueError('--distributed-init-method or --distributed-port '
'must be specified for distributed training')
args.distributed_rank = distributed_utils.distributed_init(args)
print('| initialized host {} as rank {}'.format(socket.gethostname(), args.distributed_rank))
single_process_main(args)
def main(args, init_distributed=False):
utils.import_user_module(args)
assert args.max_tokens is not None or args.max_sentences is not None, \
'Must specify batch size either with --max-tokens or --max-sentences'
# Initialize CUDA and distributed training
if torch.cuda.is_available() and not args.cpu:
torch.cuda.set_device(args.device_id)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
if init_distributed:
args.distributed_rank = distributed_utils.distributed_init(args)
if distributed_utils.is_master(args):
checkpoint_utils.verify_checkpoint_directory(args.save_dir)
# Print args
print(args)
# Setup task, e.g., translation, language modeling, etc.
task = tasks.setup_task(args)
# Load valid dataset (we load training data below, based on the latest checkpoint)
for valid_sub_split in args.valid_subset.split(','):
task.load_dataset(valid_sub_split, combine=False, epoch=0)
# Build model and criterion
model = task.build_model(args)
parser = options.get_training_parser()
##############################################################################
##############################################################################
####
#### Added an argument
####
##############################################################################
##############################################################################
parser.add_argument('--lr_decay', default=1, type=float,
help='Learning rate decay factor, 1.0 = no decay')
parser.add_argument('--lr_decay_layers', default=24, type=int,
help='Number of layers for learning rate decay')
args = options.parse_args_and_arch(parser)
if args.distributed_init_method is None:
distributed_utils.infer_init_method(args)
if args.distributed_init_method is not None:
# distributed training
if torch.cuda.device_count() > 1 and not args.distributed_no_spawn:
start_rank = args.distributed_rank
args.distributed_rank = None # assign automatically
torch.multiprocessing.spawn(
fn=distributed_main,
args=(args, start_rank),
nprocs=torch.cuda.device_count(),
)
else:
distributed_main(args.device_id, args)
elif args.distributed_world_size > 1:
# fallback for single node with multiple GPUs
assert args.distributed_world_size <= torch.cuda.device_count()
if ooms > 0 and self._oom_batch is not None:
self.handle_ooms(ooms)
if dummy_batch:
return None
# gather logging outputs from all replicas
if self.args.distributed_world_size > 1 and (
(not self.args.use_bmuf)
or (
self.args.use_bmuf
and (self.get_num_updates() + 1) % self.args.global_sync_iter == 0
)
):
logging_outputs, sample_sizes, ooms, prev_norms = \
zip(*distributed_utils.all_gather_list(
[logging_outputs, sample_sizes, ooms, self._prev_grad_norm],
))
logging_outputs = list(chain.from_iterable(logging_outputs))
sample_sizes = list(chain.from_iterable(sample_sizes))
ooms = sum(ooms)
if not self.args.use_bmuf:
assert (
all(norm == prev_norms[0] for norm in prev_norms)
or all(math.isnan(norm) or math.isinf(norm) for norm in prev_norms)
), 'Fatal error: gradients are inconsistent between workers'
self.meters['oom'].update(ooms, len(samples))
if ooms == self.args.distributed_world_size * len(samples):
print('| WARNING: OOM in all workers, skipping update')
self.zero_grad()
def cli_main():
parser = options.get_training_parser()
args = options.parse_args_and_arch(parser)
if args.distributed_init_method is None:
distributed_utils.infer_init_method(args)
if args.distributed_init_method is not None:
# distributed training
if torch.cuda.device_count() > 1 and not args.distributed_no_spawn:
start_rank = args.distributed_rank
args.distributed_rank = None # assign automatically
torch.multiprocessing.spawn(
fn=distributed_main,
args=(args, start_rank),
nprocs=torch.cuda.device_count(),
)
else:
distributed_main(args.device_id, args)
elif args.distributed_world_size > 1:
# fallback for single node with multiple GPUs
assert args.distributed_world_size <= torch.cuda.device_count()
print('| WARNING: ran out of memory, retrying batch')
for p in self.model.parameters():
if p.grad is not None:
p.grad = None # free some memory
if self.cuda:
torch.cuda.empty_cache()
return self.valid_step(sample, raise_oom=True)
else:
raise e
if ignore_results:
logging_output, sample_size = {}, 0
# gather logging outputs from all replicas
if self.args.distributed_world_size > 1:
logging_output, sample_size = zip(*distributed_utils.all_gather_list(
[logging_output, sample_size],
))
logging_output = list(logging_output)
sample_size = list(sample_size)
else:
logging_output = [logging_output]
sample_size = [sample_size]
# aggregate logging outputs and sample sizes
logging_output = self.task.aggregate_logging_outputs(
logging_output, self.get_criterion()
)
sample_size = self.task.grad_denom(
sample_size, self.get_criterion()
)
def cli_main():
parser = options.get_training_parser()
parser.add_argument('--remove-bpe', nargs='?', const='@@ ', default=None,
help='remove BPE tokens before scoring '
'(can be set to sentencepiece). Being used for monitoring '
'and validation')
args = options.parse_args_and_arch(parser)
print_options_meaning_changes(args)
if args.distributed_init_method is None:
distributed_utils.infer_init_method(args)
if args.distributed_init_method is not None:
# distributed training
if torch.cuda.device_count() > 1 and not args.distributed_no_spawn:
start_rank = args.distributed_rank
args.distributed_rank = None # assign automatically
torch.multiprocessing.spawn(
fn=distributed_main,
args=(args, start_rank),
nprocs=torch.cuda.device_count(),
)
else:
distributed_main(args.device_id, args)
elif args.distributed_world_size > 1:
# fallback for single node with multiple GPUs
assert args.distributed_world_size <= torch.cuda.device_count()
def save_checkpoint(self, filename, extra_state):
"""Save all training state in a checkpoint file."""
if distributed_utils.is_master(self.args): # only save one checkpoint
extra_state['train_meters'] = self.meters
checkpoint_utils.save_state(
filename, self.args, self.get_model().state_dict(), self.get_criterion(),
self.optimizer, self.lr_scheduler, self.get_num_updates(),
self._optim_history, extra_state,
)
all_reduce_list_tensor[2:4].div_(
(all_reduce_list_tensor[0:1] * torch.log(torch.cuda.DoubleTensor([2])))
)
self._all_reduce_list = all_reduce_list_tensor.tolist()
logging_output = {}
[
sample_size,
logging_output["nsentences"],
logging_output["loss"],
logging_output["nll_loss"],
logging_output["ntokens"],
ooms,
] = self._all_reduce_list
elif self._sync_stats():
logging_outputs, sample_sizes, ooms, prev_norms = zip(
*distributed_utils.all_gather_list(
[logging_outputs, sample_sizes, ooms, self._prev_grad_norm],
max_size=getattr(self.args, 'all_gather_list_size', 16384),
)
)
logging_outputs = list(chain.from_iterable(logging_outputs))
sample_sizes = list(chain.from_iterable(sample_sizes))
ooms = sum(ooms)
if not self.args.use_bmuf:
norms = [norm for norm in prev_norms if norm is not None]
if not (
all(norm == norms[0] for norm in norms)
or all(math.isnan(norm) or math.isinf(norm) for norm in norms)
):
raise RuntimeError(
"Fatal error: gradients are inconsistent between workers. "