Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if util.shape(x=initializer) != shape:
raise TensorforceError(
"Invalid variable initializer shape: {}.".format(util.shape(x=initializer))
)
initializer = initializer
elif not isinstance(initializer, str):
raise TensorforceError("Invalid variable initializer: {}".format(initializer))
elif initializer[:6] == 'normal':
if dtype != 'float':
raise TensorforceError(
message="Invalid variable initializer value for non-float variable: {}.".format(
initializer
)
)
if initializer[6:] == '-relu':
stddev = min(0.1, sqrt(2.0 / util.product(xs=shape[:-1])))
else:
stddev = min(0.1, sqrt(2.0 / (util.product(xs=shape[:-1]) + shape[-1])))
initializer = tf.random.normal(shape=shape, stddev=stddev, dtype=tf_dtype)
elif initializer[:10] == 'orthogonal':
if dtype != 'float':
raise TensorforceError(
message="Invalid variable initializer value for non-float variable: {}.".format(
initializer
)
)
if len(shape) < 2:
raise TensorforceError(
message="Invalid variable initializer value for 0/1-rank variable: {}.".format(
initializer
)
)
def tf_q_value(self, embedding, parameters, action, name):
num_action = util.product(xs=self.actions_spec[name]['shape'])
mean, stddev, _ = parameters
flat_mean = tf.reshape(tensor=mean, shape=(-1, num_action))
flat_stddev = tf.reshape(tensor=stddev, shape=(-1, num_action))
# Advantage computation
# Network outputs entries of lower triangular matrix L
if self.l_entries[name] is None:
l_matrix = flat_stddev
l_matrix = tf.exp(l_matrix)
else:
l_matrix = tf.linalg.diag(diagonal=flat_stddev)
l_entries = self.l_entries[name].apply(x=embedding)
l_entries = tf.exp(l_entries)
offset = 0
def get_output_spec(self, input_spec):
if self.reduction == 'concat':
input_spec['shape'] = (util.product(xs=input_spec['shape']),)
elif self.reduction in ('max', 'mean', 'product', 'sum'):
input_spec['shape'] = (input_spec['shape'][-1],)
input_spec.pop('min_value', None)
input_spec.pop('max_value', None)
return input_spec
def __init__(
self, name, action_spec, embedding_shape, infer_states_value=True, summary_labels=None
):
super().__init__(
name=name, action_spec=action_spec, embedding_shape=embedding_shape,
summary_labels=summary_labels
)
input_spec = dict(type='float', shape=self.embedding_shape)
num_values = self.action_spec['num_values']
if len(self.embedding_shape) == 1:
action_size = util.product(xs=self.action_spec['shape'])
self.deviations = self.add_module(
name='deviations', module='linear', modules=layer_modules,
size=(action_size * num_values), input_spec=input_spec
)
if infer_states_value:
self.value = None
else:
self.value = self.add_module(
name='value', module='linear', modules=layer_modules, size=action_size,
input_spec=input_spec
)
else:
if len(self.embedding_shape) < 1 or len(self.embedding_shape) > 3:
raise TensorforceError.unexpected()
if self.embedding_shape[:-1] == self.action_spec['shape'][:-1]:
def tf_loss_per_instance(
self, states, internals, actions, terminal, reward, next_states, next_internals,
reference=None
):
embedding = self.network.apply(x=states, internals=internals)
log_probs = list()
for name, distribution, action in util.zip_items(self.distributions, actions):
parameters = distribution.parametrize(x=embedding)
log_prob = distribution.log_probability(parameters=parameters, action=action)
collapsed_size = util.product(xs=util.shape(log_prob)[1:])
log_prob = tf.reshape(tensor=log_prob, shape=(-1, collapsed_size))
log_probs.append(log_prob)
log_probs = tf.concat(values=log_probs, axis=1)
if reference is None:
old_log_probs = tf.stop_gradient(input=log_probs)
else:
old_log_probs = reference
# Comment on log_ratio 1.0 and gradient perspective
prob_ratios = tf.exp(x=(log_probs - old_log_probs))
prob_ratio_per_instance = tf.reduce_mean(input_tensor=prob_ratios, axis=1)
likelihood_ratio_clipping = self.likelihood_ratio_clipping.value()
clipped_prob_ratio_per_instance = tf.clip_by_value(
def tf_states_value(
self, states, internals, auxiliaries, reduced=True, include_per_action=False
):
states_values = self.states_values(
states=states, internals=internals, auxiliaries=auxiliaries
)
for name, spec, states_value in util.zip_items(self.actions_spec, states_values):
states_values[name] = tf.reshape(
tensor=states_value, shape=(-1, util.product(xs=spec['shape']))
)
states_value = tf.concat(values=tuple(states_values.values()), axis=1)
if reduced:
states_value = tf.math.reduce_mean(input_tensor=states_value, axis=1)
if include_per_action:
for name in self.actions_spec:
states_values[name] = tf.math.reduce_mean(
input_tensor=states_values[name], axis=1
)
if include_per_action:
states_values['*'] = states_value
return states_values
else:
return states_value
def tf_kl_divergence(
self, states, internals, auxiliaries, other=None, reduced=True, include_per_action=False
):
kl_divergences = self.kl_divergences(
states=states, internals=internals, auxiliaries=auxiliaries, other=other
)
for name, spec, kl_divergence in util.zip_items(self.actions_spec, kl_divergences):
kl_divergences[name] = tf.reshape(
tensor=kl_divergence, shape=(-1, util.product(xs=spec['shape']))
)
kl_divergence = tf.concat(values=tuple(kl_divergences.values()), axis=1)
if reduced:
kl_divergence = tf.math.reduce_mean(input_tensor=kl_divergence, axis=1)
if include_per_action:
for name in self.actions_spec:
kl_divergences[name] = tf.math.reduce_mean(
input_tensor=kl_divergences[name], axis=1
)
if include_per_action:
kl_divergences['*'] = kl_divergence
return kl_divergences
else:
return kl_divergence