Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
experiment_name = f"{args.gym_id}__{args.exp_name}__{args.seed}__{int(time.time())}"
writer = SummaryWriter(f"runs/{experiment_name}")
writer.add_text('hyperparameters', "|param|value|\n|-|-|\n%s" % (
'\n'.join([f"|{key}|{value}|" for key, value in vars(args).items()])))
if args.prod_mode:
import wandb
wandb.init(project=args.wandb_project_name, entity=args.wandb_entity, tensorboard=True, config=vars(args), name=experiment_name, monitor_gym=True)
writer = SummaryWriter(f"/tmp/{experiment_name}")
wandb.save(os.path.abspath(__file__))
# TRY NOT TO MODIFY: seeding
device = torch.device('cuda' if torch.cuda.is_available() and args.cuda else 'cpu')
env = gym.make(args.gym_id)
# respect the default timelimit
assert isinstance(env.action_space, MultiDiscrete), "only MultiDiscrete action space is supported"
assert isinstance(env, TimeLimit) or int(args.episode_length), "the gym env does not have a built in TimeLimit, please specify by using --episode-length"
if isinstance(env, TimeLimit):
if int(args.episode_length):
env._max_episode_steps = int(args.episode_length)
args.episode_length = env._max_episode_steps
else:
env = TimeLimit(env, int(args.episode_length))
env = NormalizedEnv(env.env, ob=args.norm_obs, ret=args.norm_returns, clipob=args.obs_clip, cliprew=args.rew_clip, gamma=args.gamma)
env = TimeLimit(env, int(args.episode_length))
random.seed(args.seed)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
torch.backends.cudnn.deterministic = args.torch_deterministic
env.seed(args.seed)
env.action_space.seed(args.seed)
env.observation_space.seed(args.seed)
def __init__(self, player1, player2, higher_starts=True):
self.action_space = spaces.MultiDiscrete([5, 25, 25])
self.observation_space = spaces.MultiDiscrete([[3, 16], ]*24)
self.board = np.array([[2, 2], [0, 0], [0, 0], [0, 0], [0, 0], [1, 5],
[0, 0], [1, 3], [0, 0], [0, 0], [0, 0], [2, 5],
[1, 5], [0, 0], [0, 0], [0, 0], [2, 3], [0, 0],
[2, 5], [0, 0], [0, 0], [0, 0], [0, 0], [1, 2]])
self.player1 = player1
self.player2 = player2
self.higher_starts = higher_starts
player1_roll = np.sum(roll_dice())
player2_roll = np.sum(roll_dice())
while player1_roll == player2_roll:
player1_roll = np.sum(roll_dice())
player2_roll = np.sum(roll_dice())
def analyze_action_space(action_space):
"""Returns the number of controls and actions for an action space."""
assert isinstance(
action_space, (gym.spaces.Discrete, gym.spaces.MultiDiscrete)
), 'Action space expected to be Discrete of MultiDiscrete, got {}.'.format(
type(action_space)
)
if isinstance(action_space, gym.spaces.Discrete):
n_actions = action_space.n
n_controls = 1
else:
(n_controls,) = action_space.nvec.shape
assert n_controls > 0
assert onp.min(action_space.nvec) == onp.max(action_space.nvec), (
'Every control must have the same number of actions.'
)
n_actions = action_space.nvec[0]
return (n_controls, n_actions)
def extend_space(space, history_len) -> spaces.Space:
def _extend_to_history_len(np_array):
return np.concatenate(
history_len * [np.expand_dims(np_array, 0)], axis=0
)
if isinstance(space, spaces.Discrete):
result = spaces.MultiDiscrete([history_len, space.n])
elif isinstance(space, spaces.MultiDiscrete):
nvec = np.hstack(
(history_len * np.ones((space.nvec.shape[0], 1)), space.nvec)
)
result = spaces.MultiDiscrete(nvec)
elif isinstance(space, spaces.Box):
result = spaces.Box(
low=_extend_to_history_len(space.low),
high=_extend_to_history_len(space.high),
# shape=(history_len,) + space.shape,
dtype=space.dtype
)
elif isinstance(space, spaces.Tuple):
result = []
for value in space.spaces:
result.append(extend_space(value, history_len))
result = spaces.Tuple(result)
self._base_validate_config()
self.frame_skip = self.config['FRAME_SKIP']
if self.frame_skip < 1:
self.frame_skip = 1
self.controller_server, self.controller_server_thread = self._start_controller_server()
self.xvfb_process, self.emulator_process = \
self._start_emulator(rom_name=self.config['ROM_NAME'],
gfx_plugin=self.config['GFX_PLUGIN'],
input_driver_path=self.config['INPUT_DRIVER_PATH'])
with self.controller_server.frame_skip_disabled():
self._navigate_menu()
self.observation_space = \
spaces.Box(low=0, high=255, shape=(SCR_H, SCR_W, SCR_D))
self.action_space = spaces.MultiDiscrete([[-80, 80], # Joystick X-axis
[-80, 80], # Joystick Y-axis
[ 0, 1], # A Button
[ 0, 1], # B Button
[ 0, 1], # RB Button
[ 0, 1], # LB Button
[ 0, 1], # Z Button
[ 0, 1], # C Right Button
[ 0, 1], # C Left Button
[ 0, 1], # C Down Button
[ 0, 1], # C Up Button
[ 0, 1], # D-Pad Right Button
[ 0, 1], # D-Pad Left Button
[ 0, 1], # D-Pad Down Button
[ 0, 1], # D-Pad Up Button
[ 0, 1], # Start Button
])
def hard_update(source_vars: Sequence[tf.Variable], target_vars: Sequence[tf.Variable]) -> None:
"""Copy source variables to target variables.
Arguments:
source_vars {Sequence[tf.Variable]} -- Source variables to copy from
target_vars {Sequence[tf.Variable]} -- Variables to copy data to
"""
soft_update(source_vars, target_vars, 1.0) # Tau of 1, so get everything from source and keep nothing from target
def flatten_list(l: List[List]):
return list(itertools.chain.from_iterable(l))
spaces_mapping = {
Discrete: "discrete",
MultiDiscrete: "multidiscrete",
Box: "continuous",
MultiBinary: "multibinary"
}
def __init__(self, level):
utils.EzPickle.__init__(self)
self.previous_level = -1
self.level = level
self.game = DoomGame()
self.loader = Loader()
self.doom_dir = os.path.dirname(os.path.abspath(__file__))
self.mode = 'fast' # 'human', 'fast' or 'normal'
self.no_render = False # To disable double rendering in human mode
self.viewer = None
self.is_initialized = False # Indicates that reset() has been called
self.curr_seed = 0
self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
self.allowed_actions = list(range(NUM_ACTIONS))
self._seed()
self._configure()
def flatten_space(tuple_space):
"""Flattens a Tuple of like-spaces into a single bigger space of the appropriate type.
The spaces do not have to have the same shape, but do need to be of compatible types.
For example, we can flatten a (Box(10), Box(5)) into Box(15) or a (Discrete(2), Discrete(2))
into a MultiDiscrete([2, 2]), but cannot flatten a (Box(10), Discrete(2))."""
unique_types = set(type(space) for space in tuple_space.spaces)
if len(unique_types) > 1:
raise TypeError(f"Cannot flatten a space with more than one type: {unique_types}")
uniq_type = unique_types.pop()
if isinstance(uniq_type, gym.spaces.Discrete):
flat_space = gym.spaces.MultiDiscrete([space.n for space in tuple_space.spaces])
flatten = unflatten = lambda x: x
elif isinstance(uniq_type, gym.spaces.MultiDiscrete):
flat_space = gym.spaces.MultiDiscrete([space.nvec for space in tuple_space.spaces])
flatten = unflatten = lambda x: x
elif isinstance(uniq_type, gym.spaces.Box):
low = np.concatenate(*[space.low for space in tuple_space.spaces], axis=0)
high = np.concatenate(*[space.high for space in tuple_space.spaces], axis=0)
flat_space = gym.spaces.Box(low=low, high=high)
def flatten(x):
return np.flatten(x)
def unflatten(x):
sizes = [np.prod(space.shape) for space in tuple_space.spaces]
start = np.cumsum(sizes)
end = start[1:] + len(x)
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError