Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def define_learn(self, obs, action, reward, next_obs, terminal):
""" update value model self.model with DQN algorithm
"""
pred_value = self.model.value(obs)
next_pred_value = self.target_model.value(next_obs)
best_v = layers.reduce_max(next_pred_value, dim=1)
best_v.stop_gradient = True
target = reward + (
1.0 - layers.cast(terminal, dtype='float32')) * self.gamma * best_v
action_onehot = layers.one_hot(action, self.action_dim)
action_onehot = layers.cast(action_onehot, dtype='float32')
pred_action_value = layers.reduce_sum(
layers.elementwise_mul(action_onehot, pred_value), dim=1)
cost = layers.square_error_cost(pred_action_value, target)
cost = layers.reduce_mean(cost)
optimizer = fluid.optimizer.Adam(self.lr, epsilon=1e-3)
optimizer.minimize(cost)
return cost
critic_output = self.critics[i].predict(batch_obs, batch_actions)
critic_output = layers.unsqueeze(critic_output, axes=[1])
critic_outputs.append(critic_output)
score_matrix = layers.concat(critic_outputs, axis=1)
# Normalize scores given by each critic
sum_critic_score = layers.reduce_sum(
score_matrix, dim=0, keep_dim=True)
sum_critic_score = layers.expand(
sum_critic_score, expand_times=[self.ensemble_num, 1])
norm_score_matrix = score_matrix / sum_critic_score
actions_mean_score = layers.reduce_mean(
norm_score_matrix, dim=1, keep_dim=True)
best_score_id = layers.argmax(actions_mean_score, axis=0)
best_score_id = layers.cast(best_score_id, dtype='int32')
ensemble_predict_action = layers.gather(batch_actions, best_score_id)
ensemble_predict_action = layers.squeeze(
ensemble_predict_action, axes=[0])
return ensemble_predict_action
def _critic_learn(self, obs, action, reward, next_obs, terminal, critic_lr,
model_id):
next_action = self.target_models[model_id].policy(next_obs)
next_Q = self.target_models[model_id].value(next_obs, next_action)
terminal = layers.cast(terminal, dtype='float32')
target_Q = reward + (1.0 - terminal) * self.gamma * next_Q
target_Q.stop_gradient = True
Q = self.models[model_id].value(obs, action)
cost = layers.square_error_cost(Q, target_Q)
cost = layers.reduce_mean(cost)
optimizer = fluid.optimizer.AdamOptimizer(critic_lr)
optimizer.minimize(cost)
return cost
def _increment_exploration_counter(self):
if self.explore:
counter = self.exploration_counter()
exploration_counter_ = counter + 1
switch = layers.cast(
x=(exploration_counter_ > self.total_exploration_batches),
dtype="float32")
## if the counter already hits the limit, we do not change the counter
layers.assign(
switch * counter + (1 - switch) * exploration_counter_,
counter)
Given an input vector (Variable) and an idx (int or Variable),
select the entry of the vector according to the idx.
"""
assert isinstance(input, Variable)
assert len(input.shape) == 2
batch_size, num_entries = input.shape
if isinstance(idx, int):
## if idx is a constant int, then we create a variable
idx = layers.fill_constant(
shape=[batch_size, 1], dtype="int64", value=idx)
else:
assert isinstance(idx, Variable)
assert input.shape
select = layers.cast(
x=layers.one_hot(input=idx, depth=num_entries), dtype="float32")
return inner_prod(select, input)
critic_output = self.models[i].value(batch_obs, batch_actions)
critic_output = layers.unsqueeze(critic_output, axes=[1])
critic_outputs.append(critic_output)
score_matrix = layers.concat(critic_outputs, axis=1)
# Normalize scores given by each critic
sum_critic_score = layers.reduce_sum(
score_matrix, dim=0, keep_dim=True)
sum_critic_score = layers.expand(
sum_critic_score, expand_times=[self.ensemble_num, 1])
norm_score_matrix = score_matrix / sum_critic_score
actions_mean_score = layers.reduce_mean(
norm_score_matrix, dim=1, keep_dim=True)
best_score_id = layers.argmax(actions_mean_score, axis=0)
best_score_id = layers.cast(best_score_id, dtype='int32')
ensemble_predict_action = layers.gather(batch_actions, best_score_id)
return ensemble_predict_action