Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_huber_loss(self):
# [0, 0] and [1, 1] -> [0.5, 0.5]
y_target = np.array([0., 0.])
y_pred = np.array([1., 1.])
expected = np.array([0.5, 0.5])
loss = huber_loss(y_target, y_pred)
print(loss)
# self.assertEqual(expected, loss.numpy())
y_target = np.array([0., 0.])
y_pred = np.array([10., 10.])
expected = np.array([10., 10.])
loss = huber_loss(y_target, y_pred)
print(loss)
# self.assertEqual(expected, loss.numpy())
def _train_body(self, states, actions, next_states, rewards, done, weights):
with tf.device(self.device):
with tf.GradientTape() as tape:
if self._enable_categorical_dqn:
td_errors = self._compute_td_error_body_distributional(
states, actions, next_states, rewards, done)
q_func_loss = tf.reduce_mean(
huber_loss(tf.negative(td_errors),
delta=self.max_grad) * weights)
else:
td_errors = self._compute_td_error_body(
states, actions, next_states, rewards, done)
q_func_loss = tf.reduce_mean(
huber_loss(td_errors,
delta=self.max_grad) * weights)
q_func_grad = tape.gradient(
q_func_loss, self.q_func.trainable_variables)
self.q_func_optimizer.apply_gradients(
zip(q_func_grad, self.q_func.trainable_variables))
return td_errors, q_func_loss
def _train_body(self, states, actions, next_states, rewards, done, weights):
with tf.device(self.device):
with tf.GradientTape() as tape:
td_errors = self._compute_td_error_body(
states, actions, next_states, rewards, done)
critic_loss = tf.reduce_mean(
huber_loss(td_errors, delta=self.max_grad) * weights)
critic_grad = tape.gradient(
critic_loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(
zip(critic_grad, self.critic.trainable_variables))
with tf.GradientTape() as tape:
next_action = self.actor(states)
actor_loss = -tf.reduce_mean(self.critic([states, next_action]))
actor_grad = tape.gradient(
actor_loss, self.actor.trainable_variables)
self.actor_optimizer.apply_gradients(
zip(actor_grad, self.actor.trainable_variables))
# Update target networks
if tf.rank(rewards) == 2:
rewards = tf.squeeze(rewards, axis=1)
not_dones = 1. - tf.cast(dones, dtype=tf.float32)
with tf.GradientTape(persistent=True) as tape:
# Compute loss of critic Q
current_q1 = self.qf1([states, actions])
current_q2 = self.qf2([states, actions])
vf_next_target = self.vf_target(next_states)
target_q = tf.stop_gradient(
rewards + not_dones * self.discount * vf_next_target)
td_loss_q1 = tf.reduce_mean(huber_loss(
target_q - current_q1, delta=self.max_grad) * weights)
td_loss_q2 = tf.reduce_mean(huber_loss(
target_q - current_q2, delta=self.max_grad) * weights) # Eq.(7)
# Compute loss of critic V
current_v = self.vf(states)
sample_actions, logp, _ = self.actor(states) # Resample actions to update V
current_q1 = self.qf1([states, sample_actions])
current_q2 = self.qf2([states, sample_actions])
current_min_q = tf.minimum(current_q1, current_q2)
target_v = tf.stop_gradient(
current_min_q - self.alpha * logp)
td_errors = target_v - current_v
td_loss_v = tf.reduce_mean(
huber_loss(td_errors, delta=self.max_grad) * weights) # Eq.(5)
td_loss_q2 = tf.reduce_mean(huber_loss(
target_q - current_q2, delta=self.max_grad) * weights) # Eq.(7)
# Compute loss of critic V
current_v = self.vf(states)
sample_actions, logp, _ = self.actor(states) # Resample actions to update V
current_q1 = self.qf1([states, sample_actions])
current_q2 = self.qf2([states, sample_actions])
current_min_q = tf.minimum(current_q1, current_q2)
target_v = tf.stop_gradient(
current_min_q - self.alpha * logp)
td_errors = target_v - current_v
td_loss_v = tf.reduce_mean(
huber_loss(td_errors, delta=self.max_grad) * weights) # Eq.(5)
# Compute loss of policy
policy_loss = tf.reduce_mean(
(self.alpha * logp - current_min_q) * weights) # Eq.(12)
# Compute loss of temperature parameter for entropy
if self.auto_alpha:
alpha_loss = -tf.reduce_mean(
(self.log_alpha * tf.stop_gradient(logp + self.target_alpha)))
q1_grad = tape.gradient(td_loss_q1, self.qf1.trainable_variables)
self.qf1_optimizer.apply_gradients(
zip(q1_grad, self.qf1.trainable_variables))
q2_grad = tape.gradient(td_loss_q2, self.qf2.trainable_variables)
self.qf2_optimizer.apply_gradients(
zip(q2_grad, self.qf2.trainable_variables))
next_q = tf.minimum(
self.qf1_target(next_states), self.qf2_target(next_states))
# Compute state value function V by directly computes expectation
target_q = tf.expand_dims(tf.einsum(
'ij,ij->i', next_action_prob, next_q - self.alpha * next_action_logp), axis=1) # Eq.(10)
target_q = tf.stop_gradient(
rewards + not_dones * self.discount * target_q)
current_q1 = self.qf1(states)
current_q2 = self.qf2(states)
td_loss1 = tf.reduce_mean(huber_loss(
target_q - tf.expand_dims(tf.gather_nd(current_q1, indices), axis=1),
delta=self.max_grad) * weights)
td_loss2 = tf.reduce_mean(huber_loss(
target_q - tf.expand_dims(tf.gather_nd(current_q2, indices), axis=1),
delta=self.max_grad) * weights) # Eq.(7)
# Compute actor loss
_, _, current_action_param = self.actor(states)
current_action_prob = current_action_param["prob"]
current_action_logp = tf.math.log(current_action_prob + 1e-8)
policy_loss = tf.reduce_mean(
tf.einsum('ij,ij->i', current_action_prob,
self.alpha * current_action_logp - tf.stop_gradient(
tf.minimum(current_q1, current_q2))) * weights) # Eq.(12)
mean_ent = tf.reduce_mean(
tf.einsum('ij,ij->i', current_action_prob, current_action_logp)) * (-1)
if self.auto_alpha:
def _train_body(self, states, actions, next_states, rewards, done, weights):
with tf.device(self.device):
with tf.GradientTape() as tape:
if self._enable_categorical_dqn:
td_errors = self._compute_td_error_body_distributional(
states, actions, next_states, rewards, done)
q_func_loss = tf.reduce_mean(
huber_loss(tf.negative(td_errors),
delta=self.max_grad) * weights)
else:
td_errors = self._compute_td_error_body(
states, actions, next_states, rewards, done)
q_func_loss = tf.reduce_mean(
huber_loss(td_errors,
delta=self.max_grad) * weights)
q_func_grad = tape.gradient(
q_func_loss, self.q_func.trainable_variables)
self.q_func_optimizer.apply_gradients(
zip(q_func_grad, self.q_func.trainable_variables))
return td_errors, q_func_loss
def _train_body(self, states, actions, next_states, rewards, done, weights):
with tf.device(self.device):
with tf.GradientTape() as tape:
td_error1, td_error2 = self._compute_td_error_body(
states, actions, next_states, rewards, done)
critic_loss = tf.reduce_mean(huber_loss(td_error1, delta=self.max_grad) * weights) + \
tf.reduce_mean(huber_loss(td_error2, delta=self.max_grad) * weights)
critic_grad = tape.gradient(
critic_loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(
zip(critic_grad, self.critic.trainable_variables))
self._it.assign_add(1)
with tf.GradientTape() as tape:
next_actions = self.actor(states)
actor_loss = - \
tf.reduce_mean(self.critic([states, next_actions]))
remainder = tf.math.mod(self._it, self._actor_update_freq)
def optimize_actor():
actor_grad = tape.gradient(