Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 20, False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(20), False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 20, False),
[0.665348, 0.7674, 0.87, 1, 2.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(20), False),
[0.665348, 0.7674, 0.87, 1, 2.5])
rewards = [1, 2, 3, 4, 5]
Vs = [0.1, 1.1, 2.1, 3.1, 4.1]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 10, False),
[5.80625, 7.6125, 9.225, 10.45, 10.9])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(10), False),
[5.80625, 7.6125, 9.225, 10.45, 10.9])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 10, False),
[1.03269478, 1.1347393, 1.23696, 1.348, 1.9])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(10), False),
[1.03269478, 1.1347393, 1.23696, 1.348, 1.9])
rewards = [1, 2, 3, 4, 5, 6, 7, 8]
Vs = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 30, True),
[5.84375, 7.6875, 9.375, 10.75, 11.5, 11., 8, 0.])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(30), True),
[5.84375, 7.6875, 9.375, 10.75, 11.5, 11., 8, 0.])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 30, True),
[0.206164098, 0.308204915, 0.410245728, 0.5122864, 0.61432, 0.716, 0.8, 0])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(30), True),
[0.206164098, 0.308204915, 0.410245728, 0.5122864, 0.61432, 0.716, 0.8, 0])
def test_gae():
rewards = [1, 2, 3]
Vs = [0.1, 1.1, 2.1]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 10, True),
[3.725, 3.45, 0.9])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(10), True),
[3.725, 3.45, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 10, True),
[1.03256, 1.128, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(10), True),
[1.03256, 1.128, 0.9])
rewards = [1, 2, 3]
Vs = [0.5, 1.5, 2.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 99, True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(99), True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 99, True),
[0.6652, 0.76, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(99), True),
[0.6652, 0.76, 0.5])
rewards = [1, 2, 3, 4, 5]
def test_gae():
rewards = [1, 2, 3]
Vs = [0.1, 1.1, 2.1]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 10, True),
[3.725, 3.45, 0.9])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(10), True),
[3.725, 3.45, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 10, True),
[1.03256, 1.128, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(10), True),
[1.03256, 1.128, 0.9])
rewards = [1, 2, 3]
Vs = [0.5, 1.5, 2.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 99, True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(99), True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 99, True),
[0.6652, 0.76, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(99), True),
[0.6652, 0.76, 0.5])
[1.03256, 1.128, 0.9])
rewards = [1, 2, 3]
Vs = [0.5, 1.5, 2.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 99, True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(99), True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 99, True),
[0.6652, 0.76, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(99), True),
[0.6652, 0.76, 0.5])
rewards = [1, 2, 3, 4, 5]
Vs = [0.5, 1.5, 2.5, 3.5, 4.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 20, False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(20), False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 20, False),
[0.665348, 0.7674, 0.87, 1, 2.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(20), False),
[0.665348, 0.7674, 0.87, 1, 2.5])
rewards = [1, 2, 3, 4, 5]
Vs = [0.1, 1.1, 2.1, 3.1, 4.1]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 10, False),
[5.80625, 7.6125, 9.225, 10.45, 10.9])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(10), False),
[5.80625, 7.6125, 9.225, 10.45, 10.9])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 10, False),
[1.03269478, 1.1347393, 1.23696, 1.348, 1.9])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(10), True),
[3.725, 3.45, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 10, True),
[1.03256, 1.128, 0.9])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(10), True),
[1.03256, 1.128, 0.9])
rewards = [1, 2, 3]
Vs = [0.5, 1.5, 2.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 99, True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(99), True),
[3.625, 3.25, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 99, True),
[0.6652, 0.76, 0.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(99), True),
[0.6652, 0.76, 0.5])
rewards = [1, 2, 3, 4, 5]
Vs = [0.5, 1.5, 2.5, 3.5, 4.5]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 20, False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(1.0, 0.5, rewards, torch.tensor(Vs), torch.tensor(20), False),
[6.40625, 8.8125, 11.625, 15.25, 20.5])
assert np.allclose(gae(0.1, 0.2, rewards, Vs, 20, False),
[0.665348, 0.7674, 0.87, 1, 2.5])
assert np.allclose(gae(0.1, 0.2, rewards, torch.tensor(Vs), torch.tensor(20), False),
[0.665348, 0.7674, 0.87, 1, 2.5])
rewards = [1, 2, 3, 4, 5]
Vs = [0.1, 1.1, 2.1, 3.1, 4.1]
assert np.allclose(gae(1.0, 0.5, rewards, Vs, 10, False),
def learn(self, D, **kwargs):
logprobs = [torch.cat(traj.get_infos('action_logprob')) for traj in D]
entropies = [torch.cat(traj.get_infos('entropy')) for traj in D]
Vs = [torch.cat(traj.get_infos('V')) for traj in D]
last_Vs = [traj.extra_info['last_info']['V'] for traj in D]
Qs = [bootstrapped_returns(self.config['agent.gamma'], traj.rewards, last_V, traj.reach_terminal)
for traj, last_V in zip(D, last_Vs)]
As = [gae(self.config['agent.gamma'], self.config['agent.gae_lambda'], traj.rewards, V, last_V, traj.reach_terminal)
for traj, V, last_V in zip(D, Vs, last_Vs)]
# Metrics -> Tensor, device
logprobs, entropies, Vs = map(lambda x: torch.cat(x).squeeze(), [logprobs, entropies, Vs])
Qs, As = map(lambda x: torch.as_tensor(np.concatenate(x)).float().to(self.config.device), [Qs, As])
if self.config['agent.standardize_adv']:
As = (As - As.mean())/(As.std() + 1e-4)
assert all([x.ndim == 1 for x in [logprobs, entropies, Vs, Qs, As]])
# Loss
policy_loss = -logprobs*As.detach()
entropy_loss = -entropies
value_loss = F.mse_loss(Vs, Qs, reduction='none')
loss = policy_loss + self.config['agent.value_coef']*value_loss + self.config['agent.entropy_coef']*entropy_loss
loss = loss.mean()
def learn(self, D, **kwargs):
# Compute all metrics, D: list of Trajectory
logprobs = [torch.cat(traj.get_all_info('action_logprob')) for traj in D]
entropies = [torch.cat(traj.get_all_info('entropy')) for traj in D]
Vs = [torch.cat(traj.get_all_info('V')) for traj in D]
last_observations = torch.from_numpy(np.concatenate([traj.last_observation for traj in D], 0)).float()
with torch.no_grad():
last_Vs = self.V_head(self.feature_network(last_observations.to(self.device))).squeeze(-1)
Qs = [bootstrapped_returns(self.config['agent.gamma'], traj, last_V)
for traj, last_V in zip(D, last_Vs)]
As = [gae(self.config['agent.gamma'], self.config['agent.gae_lambda'], traj, V, last_V)
for traj, V, last_V in zip(D, Vs, last_Vs)]
# Metrics -> Tensor, device
logprobs, entropies, Vs = map(lambda x: torch.cat(x).squeeze(), [logprobs, entropies, Vs])
Qs, As = map(lambda x: torch.from_numpy(np.concatenate(x).copy()).to(self.device), [Qs, As])
if self.config['agent.standardize_adv']:
As = (As - As.mean())/(As.std() + 1e-8)
assert all([x.ndimension() == 1 for x in [logprobs, entropies, Vs, Qs, As]])
# Loss
policy_loss = -logprobs*As
entropy_loss = -entropies
value_loss = F.mse_loss(Vs, Qs, reduction='none')
loss = policy_loss + self.config['agent.value_coef']*value_loss + self.config['agent.entropy_coef']*entropy_loss
def learn(self, D, **kwargs):
# Compute all metrics, D: list of Trajectory
logprobs = [torch.cat(traj.get_all_info('action_logprob')) for traj in D]
entropies = [torch.cat(traj.get_all_info('entropy')) for traj in D]
Vs = [torch.cat(traj.get_all_info('V')) for traj in D]
last_observations = torch.from_numpy(np.concatenate([traj.last_observation for traj in D], 0)).float()
with torch.no_grad():
last_Vs = self.V_head(self.feature_network(last_observations.to(self.device))).squeeze(-1)
Qs = [bootstrapped_returns(self.config['agent.gamma'], traj, last_V)
for traj, last_V in zip(D, last_Vs)]
As = [gae(self.config['agent.gamma'], self.config['agent.gae_lambda'], traj, V, last_V)
for traj, V, last_V in zip(D, Vs, last_Vs)]
# Metrics -> Tensor, device
logprobs, entropies, Vs = map(lambda x: torch.cat(x).squeeze(), [logprobs, entropies, Vs])
Qs, As = map(lambda x: torch.from_numpy(np.concatenate(x).copy()).to(self.device), [Qs, As])
if self.config['agent.standardize_adv']:
As = (As - As.mean())/(As.std() + 1e-8)
assert all([x.ndimension() == 1 for x in [logprobs, entropies, Vs, Qs, As]])
dataset = Dataset(D, logprobs, entropies, Vs, Qs, As)
dataloader = DataLoader(dataset, self.config['train.batch_size'], shuffle=True)
for epoch in range(self.config['train.num_epochs']):
logs = [self.learn_one_update(data) for data in dataloader]
self.total_timestep += sum([len(traj) for traj in D])
def learn(self, D, **kwargs):
logprobs = [torch.cat(traj.get_infos('action_logprob')) for traj in D]
entropies = [torch.cat(traj.get_infos('entropy')) for traj in D]
Vs = [torch.cat(traj.get_infos('V')) for traj in D]
last_Vs = [traj.extra_info['last_info']['V'] for traj in D]
Qs = [bootstrapped_returns(self.config['agent.gamma'], traj.rewards, last_V, traj.reach_terminal)
for traj, last_V in zip(D, last_Vs)]
As = [gae(self.config['agent.gamma'], self.config['agent.gae_lambda'], traj.rewards, V, last_V, traj.reach_terminal)
for traj, V, last_V in zip(D, Vs, last_Vs)]
# Metrics -> Tensor, device
logprobs, entropies, Vs = map(lambda x: torch.cat(x).squeeze(), [logprobs, entropies, Vs])
Qs, As = map(lambda x: tensorify(np.concatenate(x).copy(), self.device), [Qs, As])
if self.config['agent.standardize_adv']:
As = (As - As.mean())/(As.std() + 1e-4)
assert all([x.ndim == 1 for x in [logprobs, entropies, Vs, Qs, As]])
# Loss
policy_loss = -logprobs*As.detach()
entropy_loss = -entropies
value_loss = F.mse_loss(Vs, Qs, reduction='none')
loss = policy_loss + self.config['agent.value_coef']*value_loss + self.config['agent.entropy_coef']*entropy_loss
loss = loss.mean()
def learn(self, D, **kwargs):
logprobs = [torch.cat(traj.get_infos('action_logprob')) for traj in D]
entropies = [torch.cat(traj.get_infos('entropy')) for traj in D]
Vs = [torch.cat(traj.get_infos('V')) for traj in D]
with torch.no_grad():
last_observations = tensorify([traj[-1].observation for traj in D], self.device)
last_Vs = self.value(last_observations).squeeze(-1)
Qs = [bootstrapped_returns(self.config['agent.gamma'], traj.rewards, last_V, traj.reach_terminal)
for traj, last_V in zip(D, last_Vs)]
As = [gae(self.config['agent.gamma'], self.config['agent.gae_lambda'], traj.rewards, V, last_V, traj.reach_terminal)
for traj, V, last_V in zip(D, Vs, last_Vs)]
# Metrics -> Tensor, device
logprobs, entropies, Vs = map(lambda x: torch.cat(x).squeeze(), [logprobs, entropies, Vs])
Qs, As = map(lambda x: tensorify(np.concatenate(x).copy(), self.device), [Qs, As])
if self.config['agent.standardize_adv']:
As = (As - As.mean())/(As.std() + 1e-4)
assert all([x.ndim == 1 for x in [logprobs, entropies, Vs, Qs, As]])
dataset = Dataset(D, logprobs, entropies, Vs, Qs, As)
dataloader = DataLoader(dataset, self.config['train.batch_size'], shuffle=True)
for epoch in range(self.config['train.num_epochs']):
logs = [self.learn_one_update(data) for data in dataloader]
self.total_timestep += sum([traj.T for traj in D])
out = {}