Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
activation: ActivationFunction,
num_layers: int,
scope: str,
reuse: bool,
) -> tf.Tensor:
"""
Builds a set of hidden state encoders.
:param reuse: Whether to re-use the weights within the same scope.
:param scope: Graph scope for the encoder ops.
:param observation_input: Input vector.
:param h_size: Hidden layer size.
:param activation: What type of activation function to use for layers.
:param num_layers: number of hidden layers to create.
:return: List of hidden layer tensors.
"""
with tf.variable_scope(scope):
hidden = observation_input
for i in range(num_layers):
hidden = tf.layers.dense(
hidden,
h_size,
activation=activation,
reuse=reuse,
name="hidden_{}".format(i),
kernel_initializer=tf.initializers.variance_scaling(1.0),
)
return hidden
q1_hidden = self.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q1_encoder", reuse
)
if self.use_recurrent:
q1_hidden, memory_out = self.create_recurrent_encoder(
q1_hidden, self.q1_memory_in, self.sequence_length, name="lstm_q1"
)
self.q1_memory_out = memory_out
q1_heads = {}
for name in stream_names:
_q1 = tf.layers.dense(q1_hidden, num_outputs, name="{}_q1".format(name))
q1_heads[name] = _q1
q1 = tf.reduce_mean(list(q1_heads.values()), axis=0)
with tf.variable_scope(self.join_scopes(scope, "q2_encoding"), reuse=reuse):
q2_hidden = self.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q2_encoder", reuse
)
if self.use_recurrent:
q2_hidden, memory_out = self.create_recurrent_encoder(
q2_hidden, self.q2_memory_in, self.sequence_length, name="lstm_q2"
)
self.q2_memory_out = memory_out
q2_heads = {}
for name in stream_names:
_q2 = tf.layers.dense(q2_hidden, num_outputs, name="{}_q2".format(name))
q2_heads[name] = _q2
q2 = tf.reduce_mean(list(q2_heads.values()), axis=0)
hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder(
hidden_policy,
self.memory_in,
self.sequence_length_ph,
name="lstm_policy",
)
self.memory_out = tf.identity(memory_policy_out, "recurrent_out")
else:
hidden_policy = encoded
self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
with tf.variable_scope("policy"):
distribution = MultiCategoricalDistribution(
hidden_policy, self.act_size, self.action_masks
)
# It's important that we are able to feed_dict a value into this tensor to get the
# right one-hot encoding, so we can't do identity on it.
self.output = distribution.sample
self.all_log_probs = tf.identity(distribution.log_probs, name="action")
self.selected_actions = tf.stop_gradient(
distribution.sample_onehot
) # In discrete, these are onehot
self.entropy = distribution.entropy
self.total_log_probs = distribution.total_log_probs
def create_encoder(
self, state_in: tf.Tensor, action_in: tf.Tensor, done_in: tf.Tensor, reuse: bool
) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Creates the encoder for the discriminator
:param state_in: The encoded observation input
:param action_in: The action input
:param done_in: The done flags input
:param reuse: If true, the weights will be shared with the previous encoder created
"""
with tf.variable_scope("GAIL_model"):
if self.use_actions:
concat_input = tf.concat([state_in, action_in, done_in], axis=1)
else:
concat_input = state_in
hidden_1 = tf.layers.dense(
concat_input,
self.h_size,
activation=ModelUtils.swish,
name="gail_d_hidden_1",
reuse=reuse,
)
hidden_2 = tf.layers.dense(
hidden_1,
self.h_size,
def _create_encoder(
self,
visual_in: List[tf.Tensor],
vector_in: tf.Tensor,
h_size: int,
num_layers: int,
vis_encode_type: EncoderType,
) -> tf.Tensor:
"""
Creates an encoder for visual and vector observations.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
:return: The hidden layer (tf.Tensor) after the encoder.
"""
with tf.variable_scope("policy"):
encoded = ModelUtils.create_observation_streams(
self.visual_in,
self.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
return encoded
[
tf.one_hot(self.prev_action[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
hidden_policy = tf.concat([hidden_policy, prev_action_oh], axis=1)
hidden_policy, memory_out = self.create_recurrent_encoder(
hidden_policy,
self.policy_memory_in,
self.sequence_length,
name="lstm_policy",
)
self.policy_memory_out = memory_out
with tf.variable_scope(scope):
policy_branches = []
for size in self.act_size:
policy_branches.append(
tf.layers.dense(
hidden_policy,
size,
activation=None,
use_bias=False,
kernel_initializer=tf.initializers.variance_scaling(0.01),
)
)
all_logits = tf.concat(policy_branches, axis=1, name="action_probs")
output, normalized_probs, normalized_logprobs = self.create_discrete_action_masking_layer(
all_logits, self.action_masks, self.act_size
)
def create_reward_signals(self, reward_signal_configs):
"""
Create reward signals
:param reward_signal_configs: Reward signal config.
"""
with self.graph.as_default():
with tf.variable_scope(TOWER_SCOPE_NAME, reuse=tf.AUTO_REUSE):
for device_id, device in enumerate(self.devices):
with tf.device(device):
reward_tower = {}
for reward_signal, config in reward_signal_configs.items():
reward_tower[reward_signal] = create_reward_signal(
self, self.towers[device_id], reward_signal, config
)
for k, v in reward_tower[reward_signal].update_dict.items():
self.update_dict[k + "_" + str(device_id)] = v
self.reward_signal_towers.append(reward_tower)
for _, reward_tower in self.reward_signal_towers[0].items():
for _, update_key in reward_tower.stats_name_to_update_name.items():
all_reward_signal_stats = tf.stack(
[
self.update_dict[update_key + "_" + str(i)]
for i in range(len(self.towers))
def create_sac_value_head(
self, stream_names, hidden_input, num_layers, h_size, scope
):
"""
Creates one value estimator head for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
:param num_layers: Number of hidden layers for value network
:param h_size: size of hidden layers for value network
:param scope: TF scope for value network.
"""
with tf.variable_scope(scope):
value_hidden = self.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "encoder", False
)
if self.use_recurrent:
value_hidden, memory_out = self.create_recurrent_encoder(
value_hidden,
self.value_memory_in,
self.sequence_length,
name="lstm_value",
)
self.value_memory_out = memory_out
self.create_value_heads(stream_names, value_hidden)