Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, average=True):
"""
Args:
average (bool): whether to average or sum the gradients across processes.
"""
import byteps.tensorflow as bps
self.hvd = bps # BytePS has the same interface as Horovod
self.hvd.allreduce = bps.push_pull # https://github.com/bytedance/byteps/issues/8
assert os.environ.get("DMLC_ROLE", None) == "worker"
assert "DMLC_WORKER_ID" in os.environ and "DMLC_NUM_WORKER" in os.environ
bps.init()
self.is_chief = bps.rank() == 0
self._local_rank = bps.local_rank()
self._rank = bps.rank()
self._average = average
self._compression = None
self._has_compression = False
logger.info("[BytePSTrainer] local rank={}".format(self._local_rank))
SingleCostTrainer.__init__(self)
def __init__(self, average=True):
"""
Args:
average (bool): whether to average or sum the gradients across processes.
"""
import byteps.tensorflow as bps
self.hvd = bps # BytePS has the same interface as Horovod
self.hvd.allreduce = bps.push_pull # https://github.com/bytedance/byteps/issues/8
assert os.environ.get("DMLC_ROLE", None) == "worker"
assert "DMLC_WORKER_ID" in os.environ and "DMLC_NUM_WORKER" in os.environ
bps.init()
self.is_chief = bps.rank() == 0
self._local_rank = bps.local_rank()
self._rank = bps.rank()
self._average = average
self._compression = None
self._has_compression = False
logger.info("[BytePSTrainer] local rank={}".format(self._local_rank))
SingleCostTrainer.__init__(self)
def log(s, nl=True):
if bps.rank() != 0:
return
print(s, end='\n' if nl else '')
sys.stdout.flush()
# condition among the workers that share the same filesystem. If the
# directory already exists by the time this worker gets around to creating
# it, ignore the resulting exception and continue.
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
if not os.path.exists(cache_dir):
try:
os.mkdir(cache_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
pass
else:
raise
# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = \
keras.datasets.mnist.load_data('MNIST-data-%d' % bps.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
# into (-1, 784) to feed into our network. Also, need to normalize the
# features between 0 and 1.
x_train = np.reshape(x_train, (-1, 784)) / 255.0
x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model...
with tf.name_scope('input'):
image = tf.placeholder(tf.float32, [None, 784], name='image')
label = tf.placeholder(tf.float32, [None], name='label')
predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
# BytePS: adjust learning rate based on number of GPUs.
opt = tf.train.RMSPropOptimizer(0.001 * bps.size())