Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_is_empty(case, expected):
assert _.is_empty(case) == expected
def get_lr_scheduler(optim, lr_scheduler_spec):
'''Helper to parse lr_scheduler param and construct Pytorch optim.lr_scheduler'''
if ps.is_empty(lr_scheduler_spec):
lr_scheduler = NoOpLRScheduler(optim)
elif lr_scheduler_spec['name'] == 'LinearToZero':
LRSchedulerClass = getattr(torch.optim.lr_scheduler, 'LambdaLR')
frame = float(lr_scheduler_spec['frame'])
lr_scheduler = LRSchedulerClass(optim, lr_lambda=lambda x: 1 - x / frame)
else:
LRSchedulerClass = getattr(torch.optim.lr_scheduler, lr_scheduler_spec['name'])
lr_scheduler_spec = ps.omit(lr_scheduler_spec, 'name')
lr_scheduler = LRSchedulerClass(optim, **lr_scheduler_spec)
return lr_scheduler
def flatten_dict(obj, delim='.'):
'''Missing pydash method to flatten dict'''
nobj = {}
for key, val in obj.items():
if ps.is_dict(val) and not ps.is_empty(val):
strip = flatten_dict(val, delim)
for k, v in strip.items():
nobj[key + delim + k] = v
elif ps.is_list(val) and not ps.is_empty(val) and ps.is_dict(val[0]):
for idx, v in enumerate(val):
nobj[key + delim + str(idx)] = v
if ps.is_object(v):
nobj = flatten_dict(nobj, delim)
else:
nobj[key] = val
return nobj
def build_model_tails(self, out_dim, out_layer_activation):
'''Build each model_tail. These are stored as Sequential models in model_tails'''
if not ps.is_list(out_layer_activation):
out_layer_activation = [out_layer_activation] * len(out_dim)
model_tails = nn.ModuleList()
if ps.is_empty(self.tail_hid_layers):
for out_d, out_activ in zip(out_dim, out_layer_activation):
tail = net_util.build_fc_model([self.body_hid_layers[-1], out_d], out_activ)
model_tails.append(tail)
else:
assert len(self.tail_hid_layers) == len(out_dim), 'Hydra tail hid_params inconsistent with number out dims'
for out_d, out_activ, hid_layers in zip(out_dim, out_layer_activation, self.tail_hid_layers):
dims = hid_layers
model_tail = net_util.build_fc_model(dims, self.hid_layers_activation)
tail_out = net_util.build_fc_model([dims[-1], out_d], out_activ)
model_tail.add_module(str(len(model_tail)), tail_out)
model_tails.append(model_tail)
return model_tails
'bidirectional',
'seq_len',
'init_fn',
'clip_grad_val',
'loss_spec',
'optim_spec',
'lr_scheduler_spec',
'update_type',
'update_frequency',
'polyak_coef',
'gpu',
])
# restore proper in_dim from env stacked state_dim (stack_len, *raw_state_dim)
self.in_dim = in_dim[1:] if len(in_dim) > 2 else in_dim[1]
# fc body: state processing model
if ps.is_empty(self.fc_hid_layers):
self.rnn_input_dim = self.in_dim
else:
fc_dims = [self.in_dim] + self.fc_hid_layers
self.fc_model = net_util.build_fc_model(fc_dims, self.hid_layers_activation)
self.rnn_input_dim = fc_dims[-1]
# RNN model
self.rnn_model = getattr(nn, net_util.get_nn_name(self.cell_type))(
input_size=self.rnn_input_dim,
hidden_size=self.rnn_hidden_size,
num_layers=self.rnn_num_layers,
batch_first=True, bidirectional=self.bidirectional)
# tails. avoid list for single-tail for compute speed
if ps.is_integer(self.out_dim):
self.model_tail = net_util.build_fc_model([self.rnn_hidden_size, self.out_dim], self.out_layer_activation)
def calc_df_row(self, env):
'''Calculate a row for updating train_df or eval_df.'''
frame = self.env.clock.get('frame')
wall_t = env.clock.get_elapsed_wall_t()
fps = 0 if wall_t == 0 else frame / wall_t
# update debugging variables
if net_util.to_check_train_step():
grad_norms = net_util.get_grad_norms(self.agent.algorithm)
self.mean_grad_norm = np.nan if ps.is_empty(grad_norms) else np.mean(grad_norms)
row = pd.Series({
# epi and frame are always measured from training env
'epi': self.env.clock.get('epi'),
# t and reward are measured from a given env or eval_env
't': env.clock.get('t'),
'wall_t': wall_t,
'opt_step': self.env.clock.get('opt_step'),
'frame': frame,
'fps': fps,
'total_reward': np.nanmean(self.total_reward), # guard for vec env
'total_reward_ma': np.nan, # update outside
'loss': self.loss,
'lr': self.get_mean_lr(),
'explore_var': self.explore_var,
'entropy_coef': self.entropy_coef if hasattr(self, 'entropy_coef') else np.nan,
def calc_df_row(self, env):
'''Calculate a row for updating train_df or eval_df.'''
frame = self.env.clock.get('frame')
wall_t = env.clock.get_elapsed_wall_t()
fps = 0 if wall_t == 0 else frame / wall_t
# update debugging variables
if net_util.to_check_train_step():
grad_norms = net_util.get_grad_norms(self.agent.algorithm)
self.mean_grad_norm = np.nan if ps.is_empty(grad_norms) else np.mean(grad_norms)
row = pd.Series({
# epi and frame are always measured from training env
'epi': self.env.clock.get('epi'),
# t and reward are measured from a given env or eval_env
't': env.clock.get('t'),
'wall_t': wall_t,
'opt_step': self.env.clock.get('opt_step'),
'frame': frame,
'fps': fps,
'total_reward': np.nanmean(self.total_reward), # guard for vec env
'total_reward_ma': np.nan, # update outside
'loss': self.loss,
'lr': self.get_mean_lr(),
'explore_var': self.explore_var,
'entropy_coef': self.entropy_coef if hasattr(self, 'entropy_coef') else np.nan,
def calc_df_row(self, env):
'''Calculate a row for updating train_df or eval_df.'''
frame = self.env.clock.get('frame')
wall_t = env.clock.get_elapsed_wall_t()
fps = 0 if wall_t == 0 else frame / wall_t
# update debugging variables
if net_util.to_check_train_step():
grad_norms = net_util.get_grad_norms(self.agent.algorithm)
self.mean_grad_norm = np.nan if ps.is_empty(grad_norms) else np.mean(grad_norms)
row = pd.Series({
# epi and frame are always measured from training env
'epi': self.env.clock.get('epi'),
# t and reward are measured from a given env or eval_env
't': env.clock.get('t'),
'wall_t': wall_t,
'opt_step': self.env.clock.get('opt_step'),
'frame': frame,
'fps': fps,
'total_reward': np.nanmean(self.total_reward), # guard for vec env
'avg_return': np.nan, # update outside
'avg_len': np.nan, # update outside
'avg_success': np.nan, # update outside
'loss': self.loss,
'lr': self.get_mean_lr(),