Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import torchx.nn as nnx
from surreal.utils.common import iter_last
def fc_layers(input_size, output_size, hiddens, initializer='xavier'):
assert isinstance(hiddens, (list, tuple))
fcs = nn.ModuleList() # IMPORTANT for .cuda() to work!!
layers = [input_size] + hiddens + [output_size]
for prev, next in zip(layers[:-1], layers[1:]):
fcs.append(nn.Linear(prev, next))
if initializer == 'xavier':
conv_fc_init(fcs)
return fcs
class MLP(nnx.Module):
def __init__(self, input_size, output_size, hiddens, activation=None):
super().__init__()
if activation is None:
self.activation = F.relu
else:
raise NotImplementedError # TODO: other activators
self.layers = fc_layers(input_size=input_size,
output_size=output_size,
hiddens=hiddens)
def reinitialize(self):
conv_fc_init(self.layers)
def forward(self, x):
for is_last, fc in iter_last(self.layers):
x = fc(x)
self.clip_epsilon = self.clip_epsilon_init
self.clip_adjust_threshold = self.adjust_threshold
self.clip_upper = self.clip_range[1]
self.clip_lower = self.clip_range[0]
# learning rate setting:
self.min_lr = self.learner_config.algo.network.anneal.min_lr
self.lr_update_frequency = self.learner_config.algo.network.anneal.lr_update_frequency
self.frames_to_anneal = self.learner_config.algo.network.anneal.frames_to_anneal
num_updates = int(self.frames_to_anneal / self.learner_config.parameter_publish.exp_interval)
lr_scheduler = eval(self.learner_config.algo.network.anneal.lr_scheduler)
self.exp_counter = 0
self.kl_record = []
with tx.device_scope(self.gpu_option):
self.model = PPOModel(
obs_spec=self.obs_spec,
action_dim=self.action_dim,
model_config=self.learner_config.model,
use_cuda=self.use_cuda,
init_log_sig=self.init_log_sig,
use_z_filter=self.use_z_filter,
if_pixel_input=self.env_config.pixel_input,
rnn_config=self.learner_config.algo.rnn,
)
self.ref_target_model = PPOModel(
obs_spec=self.obs_spec,
action_dim=self.action_dim,
model_config=self.learner_config.model,
use_cuda=self.use_cuda,
init_log_sig=self.init_log_sig,
def __init__(self, learner_config, env_config, session_config):
# PPO setup
super().__init__(learner_config, env_config, session_config)
# GAIL-specific setup
self.reward_lambda = self.learner_config.algo.reward_lambda # reward mixing
self.lr_discriminator = self.learner_config.algo.network.lr_discriminator
self.epoch_discriminator = self.learner_config.algo.consts.epoch_discriminator
self.stride = self.learner_config.algo.stride
# learning rate setting:
num_updates = int(self.frames_to_anneal / self.learner_config.parameter_publish.exp_interval)
lr_scheduler = eval(self.learner_config.algo.network.anneal.lr_scheduler)
with tx.device_scope(self.gpu_option):
# TODO: what hypers does GAIL need? put them here ###
# add a discriminator
self.discriminator_model = GAILModel(
obs_spec=self.obs_spec,
action_dim=self.action_dim,
model_config=self.learner_config.model,
use_cuda=self.use_cuda,
use_z_filter=self.use_z_filter
)
# Learning parameters and optimizer
self.clip_discriminator_gradient = self.learner_config.algo.network.clip_discriminator_gradient
self.discriminator_gradient_clip_value = self.learner_config.algo.network.discriminator_gradient_norm_clip
self.discriminator_optim = torch.optim.Adam(
self.discriminator_model.get_discriminator_params(),
def _preprocess_batch_ppo(self, batch):
'''
Loading experiences from numpy to torch.FloatTensor type
Args:
batch: BeneDict of experiences containing following attributes
'obs' - observation
'actions' - actions
'rewards' - rewards
'obs_next' - next observation
'persistent_infos' - action policy
'onetime_infos' - RNN hidden cells or None
Return:
Benedict of torch.FloatTensors
'''
with tx.device_scope(self.gpu_option):
obs, actions, rewards, obs_next, done, persistent_infos, onetime_infos = (
batch['obs'],
batch['actions'],
batch['rewards'],
batch['obs_next'],
batch['dones'],
batch['persistent_infos'],
batch['onetime_infos'],
)
for modality in obs:
for key in obs[modality]:
obs[modality][key] = (torch.tensor(obs[modality][key], dtype=torch.float32)).detach()
obs_next[modality][key] = (torch.tensor(obs_next[modality][key], dtype=torch.float32)).detach()
Args:
obs: numpy array of (1, obs_dim)
Returns:
action_choice: sampled or max likelihood action to input to env
action_info: list of auxiliary information - [onetime, persistent]
Note: this includes probability distribution the action is
sampled from, RNN hidden states
'''
# Note: we collect two kinds of action infos, one persistent one onetime
# persistent info is collected for every step in rollout (i.e. policy probability distribution)
# onetime info is collected for the first step in partial trajectory (i.e. RNN hidden state)
# see ExpSenderWrapperMultiStepMovingWindowWithInfo in exp_sender_wrapper for more
action_info = [[], []]
with tx.device_scope(self.gpu_ids):
obs_tensor = {}
for mod in obs.keys():
obs_tensor[mod] = {}
for k in obs[mod].keys():
obs_tensor[mod][k] = torch.tensor(obs[mod][k], dtype=torch.float32).unsqueeze(0)
if self.rnn_config.if_rnn_policy:
action_info[0].append(self.cells[0].squeeze(1).cpu().numpy())
action_info[0].append(self.cells[1].squeeze(1).cpu().numpy())
action_pd, self.cells = self.model.forward_actor_expose_cells(obs_tensor, self.cells)
action_pd = action_pd.detach().cpu().numpy()
action_pd[:, self.action_dim:] *= np.exp(self.noise)
if self.agent_mode not in ['eval_deterministic', 'eval_deterministic_local']:
action_choice = self.pd.sample(action_pd)
main method for optimization that calls _adapt/clip_update and
_value_update epoch_policy and epoch_baseline times respectively
return: dictionary of tracted statistics
Args:
obs: batch of observations (batch_size, N-step , obs_dim)
obs_next: batch of next observations (batch_size, 1 , obs_dim)
actions: batch of actions (batch_size, N-step , act_dim)
rewards: batch of rewards (batch_size, N-step)
dones: batch of termination flags (batch_size, N-step)
action_infos: list of batched other attributes tracted, such as
behavior policy, RNN hidden states and etc.
Returns:
dictionary of recorded statistics
'''
# convert everything to float tensor:
with tx.device_scope(self.gpu_option):
pds = persistent_infos[-1]
if self.if_rnn_policy:
h = (onetime_infos[0].transpose(0, 1).contiguous()).detach()
c = (onetime_infos[1].transpose(0, 1).contiguous()).detach()
self.cells = (h, c)
advantages, returns = self._gae_and_return(obs,
obs_next,
rewards,
dones)
advantages = advantages.detach()
returns = returns.detach()
if self.if_rnn_policy:
h = self.cells[0].detach()
def act(self, obs):
with tx.device_scope(self.gpu_ids):
if self.sleep_time > 0.0:
time.sleep(self.sleep_time)
if not self.frame_stack_concatenate_on_env:
# Output pixels of environment is a list of frames,
# we concatenate the frames into a single numpy array
obs = copy.deepcopy(obs)
if 'pixel' in obs:
for key in obs['pixel']:
obs['pixel'][key] = np.concatenate(obs['pixel'][key], axis=0)
# Convert to pytorch tensor
obs_tensor = collections.OrderedDict()
for modality in obs:
modality_dict = collections.OrderedDict()
for key in obs[modality]:
modality_dict[key] = torch.tensor(obs[modality][key], dtype=torch.float32).unsqueeze(0)
obs_tensor[modality] = modality_dict
def __init__(self, D_in, D_act, hidden_sizes=[400, 300], use_layernorm=True):
super(CriticNetworkX, self).__init__()
xp_input_obs = L.Placeholder((None, D_in))
xp = L.Linear(hidden_sizes[0])(xp_input_obs)
xp = L.ReLU()(xp)
if use_layernorm:
xp = L.LayerNorm(1)(xp)
self.model_obs = L.Functional(inputs=xp_input_obs, outputs=xp)
self.model_obs.build((None, D_in))
xp_input_concat = L.Placeholder((None, hidden_sizes[0] + D_act))
xp = L.Linear(hidden_sizes[1])(xp_input_concat)
xp = L.ReLU()(xp)
if use_layernorm:
xp = L.LayerNorm(1)(xp)
xp = L.Linear(1)(xp)
self.model_concat = L.Functional(inputs=xp_input_concat, outputs=xp)
self.model_concat.build((None, D_act + hidden_sizes[0]))
super(CriticNetworkX, self).__init__()
xp_input_obs = L.Placeholder((None, D_in))
xp = L.Linear(hidden_sizes[0])(xp_input_obs)
xp = L.ReLU()(xp)
if use_layernorm:
xp = L.LayerNorm(1)(xp)
self.model_obs = L.Functional(inputs=xp_input_obs, outputs=xp)
self.model_obs.build((None, D_in))
xp_input_concat = L.Placeholder((None, hidden_sizes[0] + D_act))
xp = L.Linear(hidden_sizes[1])(xp_input_concat)
xp = L.ReLU()(xp)
if use_layernorm:
xp = L.LayerNorm(1)(xp)
xp = L.Linear(1)(xp)
self.model_concat = L.Functional(inputs=xp_input_concat, outputs=xp)
self.model_concat.build((None, D_act + hidden_sizes[0]))
def __init__(self, D_obs, hidden_sizes=[64, 64]):
'''
Constructor for PPO critic network
Args:
D_obs: observation space dimension, scalar
hidden_sizes: list of fully connected dimension
'''
super(PPO_CriticNetwork, self).__init__()
# assumes D_obs here is the correct RNN hidden dim if necessary
xp_input = L.Placeholder((None, D_obs))
xp = L.Linear(hidden_sizes[0])(xp_input)
xp = L.ReLU()(xp)
xp = L.Linear(hidden_sizes[1])(xp)
xp = L.ReLU()(xp)
xp = L.Linear(1)(xp)
self.model = L.Functional(inputs=xp_input, outputs=xp)
self.model.build((None, D_obs))