Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_training_operations(self, config):
num_actions = sum(util.prod(self.actions_config[name]['shape']) for name in sorted(self.action))
# Get hidden layers from network generator, then add NAF outputs, same for target network
flat_mean = layers['linear'](x=self.training_network.output, size=num_actions, scope='naf_action_means')
n = 0
for name in sorted(self.action):
shape = self.actions_config[name]['shape']
self.action_taken[name] = tf.reshape(tensor=flat_mean[:, n: n + util.prod(shape)], shape=((-1,) + shape))
n += util.prod(shape)
# Advantage computation
# Network outputs entries of lower triangular matrix L
lower_triangular_size = num_actions * (num_actions + 1) // 2
l_entries = layers['linear'](x=self.training_network.output, size=lower_triangular_size, scope='naf_matrix_entries')
l_matrix = tf.exp(x=tf.map_fn(fn=tf.diag, elems=l_entries[:, :num_actions]))
def tf_kl_divergence(self, states, internals, actions, terminal, reward, next_states, next_internals, update, reference=None):
embedding = self.network.apply(x=states, internals=internals, update=update)
kl_divergences = list()
for name in sorted(self.distributions):
distribution = self.distributions[name]
distr_params = distribution.parameterize(x=embedding)
fixed_distr_params = tuple(tf.stop_gradient(input=value) for value in distr_params)
kl_divergence = distribution.kl_divergence(distr_params1=fixed_distr_params, distr_params2=distr_params)
collapsed_size = util.prod(util.shape(kl_divergence)[1:])
kl_divergence = tf.reshape(tensor=kl_divergence, shape=(-1, collapsed_size))
kl_divergences.append(kl_divergence)
kl_divergence_per_instance = tf.reduce_mean(input_tensor=tf.concat(values=kl_divergences, axis=1), axis=1)
return tf.reduce_mean(input_tensor=kl_divergence_per_instance, axis=0)
def flatten(x, scope='flatten', summary_level=0):
"""Flatten layer.
Args:
x: Input tensor
Returns: Input tensor reshaped to 1d tensor
"""
with tf.variable_scope(scope):
x = tf.reshape(tensor=x, shape=(-1, util.prod(x.get_shape().as_list()[1:])))
return x
def tf_q_value(self, embedding, distr_params, action, name):
num_action = util.prod(self.actions_spec[name]['shape'])
mean, stddev, _ = distr_params
flat_mean = tf.reshape(tensor=mean, shape=(-1, num_action))
flat_stddev = tf.reshape(tensor=stddev, shape=(-1, num_action))
# Advantage computation
# Network outputs entries of lower triangular matrix L
if self.l_entries[name] is None:
l_matrix = flat_stddev
l_matrix = tf.exp(l_matrix)
else:
l_matrix = tf.map_fn(fn=tf.diag, elems=flat_stddev)
l_entries = self.l_entries[name].apply(x=embedding)
l_entries = tf.exp(l_entries)
offset = 0
def create_tf_operations(self, config):
# create a nstep reward placeholder for each action
with tf.variable_scope('placeholder'):
self.nstep_rewards = dict()
for name, action in config.actions.items():
# if shaped multi action (IE: action shape like (?, 2)) make a shaped nstep reward
if util.prod(action.shape) > 1:
shape = (None, util.prod(action.shape))
else:
shape = (None,)
self.nstep_rewards[name] = tf.placeholder(dtype=tf.float32, shape=shape,
name='nstep-reward-{}'.format(name))
super(DQNNstepModel, self).create_tf_operations(config)
def create_tf_operations(self, config):
# create a nstep reward placeholder for each action
with tf.variable_scope('placeholder'):
self.nstep_rewards = dict()
for name, action in config.actions.items():
# if shaped multi action (IE: action shape like (?, 2)) make a shaped nstep reward
if util.prod(action.shape) > 1:
shape = (None, util.prod(action.shape))
else:
shape = (None,)
self.nstep_rewards[name] = tf.placeholder(dtype=tf.float32, shape=shape,
name='nstep-reward-{}'.format(name))
super(DQNNstepModel, self).create_tf_operations(config)
def processed_shape(self, shape):
if shape[0] == -1:
return -1, util.prod(shape[1:])
return util.prod(shape),
def _create_action_outputs(network_output, quantized_steps, num_atoms, config, actions, num_actions):
action_logits = dict()
action_probabilities = dict()
action_qvals = dict()
action_taken = dict()
for action in actions:
logits = []
probabilities = []
qvals = []
argmax = []
# if shape of action != () we need to create another network head for each
# but always create at least 1
for shaped_action in range(max([util.prod(config.actions[action].shape), 1])):
# for each action create an output of length num_atoms
# this results in an array of output shape (batch_size, num_actions, num_atoms)
# tensors are immutable so we must use lists then stack later
actions_and_logits = []
actions_and_probabilities = []
for action_ind in range(num_actions[action]):
logits_output = layers['linear'](x=network_output, size=num_atoms, scope='{}-{}-{}'.format(action, shaped_action, action_ind))
# logits are stored for use in loss function
actions_and_logits.append(logits_output)
# softmax
actions_and_probabilities.append(layers['nonlinearity'](x=logits_output, name='softmax'))
# actions_and_x shape (batch_size, num_actions, num_atoms)
actions_and_logits = tf.stack(actions_and_logits, axis=1)
actions_and_probabilities = tf.stack(actions_and_probabilities, axis=1)
def __init__(self, variables):
self.session = None
shapes = [util.shape(variable) for variable in variables]
total_size = sum(util.prod(shape) for shape in shapes)
self.theta = tf.placeholder(tf.float32, [total_size])
start = 0
assigns = []
for (shape, variable) in zip(shapes, variables):
size = util.prod(shape)
assigns.append(tf.assign(variable, tf.reshape(self.theta[start:start + size], shape)))
start += size
self.set_op = tf.group(*assigns)
self.get_op = tf.concat(axis=0, values=[tf.reshape(variable, (-1,)) for variable in variables])
def tf_apply(self, x, update):
return tf.reshape(tensor=x, shape=(-1, util.prod(util.shape(x)[1:])))