Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_categorical_head(feature_dim, batch_size, num_action):
action_head = CategoricalHead(feature_dim, num_action, torch.device('cpu'))
assert isinstance(action_head, Module)
assert action_head.feature_dim == feature_dim
assert action_head.num_action == num_action
assert action_head.device.type == 'cpu'
dist = action_head(torch.randn(batch_size, feature_dim))
assert isinstance(dist, Categorical)
assert dist.batch_shape == (batch_size,)
assert dist.probs.shape == (batch_size, num_action)
x = dist.sample()
assert x.shape == (batch_size,)
from lagom.networks import make_fc
from lagom.networks import ortho_init
from lagom.networks import CategoricalHead
from lagom.networks import DiagGaussianHead
from lagom.networks import linear_lr_scheduler
from lagom.metric import bootstrapped_returns
from lagom.metric import gae
from lagom.transform import explained_variance as ev
from lagom.transform import describe
from torch.utils.data import DataLoader
from dataset import Dataset
class MLP(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
self.feature_layers = make_fc(flatdim(env.observation_space), config['nn.sizes'])
for layer in self.feature_layers:
ortho_init(layer, nonlinearity='relu', constant_bias=0.0)
self.layer_norms = nn.ModuleList([nn.LayerNorm(hidden_size) for hidden_size in config['nn.sizes']])
self.to(self.device)
def forward(self, x):
for layer, layer_norm in zip(self.feature_layers, self.layer_norms):
x = layer_norm(F.relu(layer(x)))
from lagom.networks import make_fc
from lagom.networks import ortho_init
from lagom.networks import CategoricalHead
from lagom.networks import DiagGaussianHead
from lagom.networks import linear_lr_scheduler
from lagom.metric import bootstrapped_returns
from lagom.metric import gae
from lagom.transform import explained_variance as ev
from lagom.transform import describe
from torch.utils.data import DataLoader
from dataset import Dataset
class MLP(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
self.feature_layers = make_fc(flatdim(env.observation_space), config['nn.sizes'])
for layer in self.feature_layers:
ortho_init(layer, nonlinearity='relu', constant_bias=0.0)
self.layer_norms = nn.ModuleList([nn.LayerNorm(hidden_size) for hidden_size in config['nn.sizes']])
self.to(self.device)
def forward(self, x):
for layer, layer_norm in zip(self.feature_layers, self.layer_norms):
x = layer_norm(F.relu(layer(x)))
self.action_head = nn.Linear(300, flatdim(env.action_space))
assert np.unique(env.action_space.high).size == 1
assert -np.unique(env.action_space.low).item() == np.unique(env.action_space.high).item()
self.max_action = env.action_space.high[0]
self.to(self.device)
def forward(self, x):
for layer in self.feature_layers:
x = F.relu(layer(x))
x = self.max_action*torch.tanh(self.action_head(x))
return x
class Critic(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
self.feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [400, 300])
self.Q_head = nn.Linear(300, 1)
self.to(self.device)
def forward(self, x, action):
x = torch.cat([x, action], dim=-1)
for layer in self.feature_layers:
x = F.relu(layer(x))
x = self.Q_head(x)
mean = self.mean_head(x)
logstd = self.logstd_head(x)
logstd = torch.tanh(logstd)
logstd = self.LOGSTD_MIN + 0.5*(self.LOGSTD_MAX - self.LOGSTD_MIN)*(1 + logstd)
std = torch.exp(logstd)
dist = TransformedDistribution(Independent(Normal(mean, std), 1), [TanhTransform(cache_size=1)])
return dist
def mean_forward(self, x):
for layer in self.feature_layers:
x = F.relu(layer(x))
mean = self.mean_head(x)
return mean
class Critic(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
# Q1
self.first_feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [256, 256])
self.first_Q_head = nn.Linear(256, 1)
# Q2
self.second_feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [256, 256])
self.second_Q_head = nn.Linear(256, 1)
self.to(self.device)
def __eq__(self, other):
return isinstance(other, TanhTransform)
def _call(self, x):
return x.tanh()
def _inverse(self, y):
return self.atanh(y)
def log_abs_det_jacobian(self, x, y):
# We use a formula that is more numerically stable, see details in the following link
# https://github.com/tensorflow/probability/commit/ef6bb176e0ebd1cf6e25c6b5cecdd2428c22963f#diff-e120f70e92e6741bca649f04fcd907b7
return 2. * (np.log(2.) - x - F.softplus(-2. * x))
class Actor(Module):
LOGSTD_MAX = 2
LOGSTD_MIN = -20
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
self.feature_layers = make_fc(flatdim(env.observation_space), [256, 256])
self.mean_head = nn.Linear(256, flatdim(env.action_space))
self.logstd_head = nn.Linear(256, flatdim(env.action_space))
self.to(device)
def forward(self, x):
for layer in self.feature_layers:
self.action_head = nn.Linear(300, flatdim(env.action_space))
assert np.unique(env.action_space.high).size == 1
assert -np.unique(env.action_space.low).item() == np.unique(env.action_space.high).item()
self.max_action = env.action_space.high[0]
self.to(self.device)
def forward(self, x):
for layer in self.feature_layers:
x = F.relu(layer(x))
x = self.max_action*torch.tanh(self.action_head(x))
return x
class Critic(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
self.feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [400, 300])
self.Q_head = nn.Linear(300, 1)
self.to(self.device)
def forward(self, x, action):
x = torch.cat([x, action], dim=-1)
for layer in self.feature_layers:
x = F.relu(layer(x))
x = self.Q_head(x)
mean = self.mean_head(x)
logstd = self.logstd_head(x)
logstd = torch.tanh(logstd)
logstd = self.LOGSTD_MIN + 0.5*(self.LOGSTD_MAX - self.LOGSTD_MIN)*(1 + logstd)
std = torch.exp(logstd)
dist = TransformedDistribution(Independent(Normal(mean, std), 1), [TanhTransform(cache_size=1)])
return dist
def mean_forward(self, x):
for layer in self.feature_layers:
x = F.relu(layer(x))
mean = self.mean_head(x)
return mean
class Critic(Module):
def __init__(self, config, env, device, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.device = device
# Q1
self.first_feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [256, 256])
self.first_Q_head = nn.Linear(256, 1)
# Q2
self.second_feature_layers = make_fc(flatdim(env.observation_space) + flatdim(env.action_space), [256, 256])
self.second_Q_head = nn.Linear(256, 1)
self.to(self.device)
def __eq__(self, other):
return isinstance(other, TanhTransform)
def _call(self, x):
return x.tanh()
def _inverse(self, y):
eps = torch.finfo(y.dtype).eps
return self.atanh(y.clamp(min=-1. + eps, max=1. +- eps))
def log_abs_det_jacobian(self, x, y):
return 2.*(np.log(2.) - x - F.softplus(-2.*x))
### Use with NormalizeAction wrapper together
class TanhDiagGaussianHead(Module):
r"""Defines a module for a tanh-squashed diagonal Gaussian (continuous) action distribution
which the standard deviation is dependent on the state.
This is particularly useful for SAC, because it maximizes trade-off between reward and entropy.
Entropy must be unique to state. For ReLU network, a randomly initialized network can produce
very large value for logstd, which results in either entirely deterministic or too random
to come back to earth. Either of these introduces numerical instability which could break
the algorithm. We constraint logstd between a range.
"""
def __init__(self, feature_dim, action_dim, device, **kwargs):
super().__init__(**kwargs)
self.feature_dim = feature_dim
self.action_dim = action_dim
self.device = device
from lagom import BaseAgent
from lagom.utils import pickle_dump
from lagom.utils import numpify
from lagom.networks import Module
from lagom.networks import make_lnlstm
from lagom.networks import ortho_init
from lagom.networks import CategoricalHead
from lagom.networks import DiagGaussianHead
from lagom.networks import linear_lr_scheduler
from lagom.metric import bootstrapped_returns
from lagom.metric import gae
from lagom.transform import explained_variance as ev
from lagom.transform import describe
class FeatureNet(Module):
def __init__(self, config, env, **kwargs):
super().__init__(**kwargs)
self.config = config
self.env = env
self.lstm = make_lnlstm(spaces.flatdim(env.observation_space), config['rnn.size'], num_layers=1)
def forward(self, x, states):
return self.lstm(x, states)
class Agent(BaseAgent):
def __init__(self, config, env, **kwargs):
super().__init__(config, env, **kwargs)
feature_dim = config['rnn.size']