Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.res = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.res, self.res, 1))
if i == self._skip - 2: self._obs_buffer[0] = obs
if i == self._skip - 1: self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info
def reset(self, **kwargs):
return self.env.reset(**kwargs)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env, width=84, height=84, grayscale=True, dict_space_key=None):
'''
Warp frames to 84x84 as done in the Nature paper and later work.
If the environment uses dictionary observations, `dict_space_key` can be specified which indicates which
observation should be warped.
'''
super().__init__(env)
self._width = width
self._height = height
self._grayscale = grayscale
self._key = dict_space_key
if self._grayscale:
num_colors = 1
else:
num_colors = 3
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1))
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info
class ClipRewardEnv(gym.RewardWrapper):
def _reward(self, reward):
"""Bin reward to {+1, 0, -1} by its sign."""
return np.sign(reward)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1))
def _observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
return frame[:, :, None]
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
"""Stack k last frames.
# Checks whether done was caused my timit limits or not
class TimeLimitMask(gym.Wrapper):
def step(self, action):
obs, rew, done, info = self.env.step(action)
if done and self.env._max_episode_steps == self.env._elapsed_steps:
info['bad_transition'] = True
return obs, rew, done, info
def reset(self, **kwargs):
return self.env.reset(**kwargs)
# Can be used to test recurrent policies for Reacher-v2
class MaskGoal(gym.ObservationWrapper):
def observation(self, observation):
if self.env._elapsed_steps > 0:
observation[-2:] = 0
return observation
class TransposeObs(gym.ObservationWrapper):
def __init__(self, env=None):
"""
Transpose observation space (base class)
"""
super(TransposeObs, self).__init__(env)
class TransposeImage(TransposeObs):
def __init__(self, env=None, op=[2, 0, 1]):
def __init__(self, env):
gym.ObservationWrapper.__init__(self, env)
self.observation_space = gym.spaces.Box(low=0, high=1, shape=env.observation_space.shape, dtype=np.float32)
if i == self._skip - 1: self._obs_buffer[1] = obs
total_reward += reward
if done:
break
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info
class ClipRewardEnv(gym.RewardWrapper):
def _reward(self, reward):
"""Bin reward to {+1, 0, -1} by its sign."""
return np.sign(reward)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1))
def _observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
return frame[:, :, None]
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
"""Stack k last frames.
def make_critic(state_in, action_in, features):
critic_stream = keras.layers.concatenate([features, action_in])
critic_stream = _dense(critic_stream, width=64, batch_normalize=BATCH_NORMALIZE)
critic_stream = _dense(critic_stream, width=64, batch_normalize=BATCH_NORMALIZE)
critic_output = _dense(critic_stream, width=1, batch_normalize=False)
critic_network = keras.Model([state_in, action_in], critic_output)
critic_network.compile(keras.optimizers.Adam(CRITIC_LR), keras.losses.mean_squared_error)
return critic_network
class CarRacing(gym.ObservationWrapper):
def __init__(self):
super().__init__(env=gym.make("CarRacing-v0"))
def observation(self, observation):
return observation / 255.
BATCH_NORMALIZE = False
ACTOR_LR = 1e-4
CRITIC_LR = 1e-4
NUM_ENVS = 4
envs = [CarRacing() for _ in range(NUM_ENVS)]
test_env = CarRacing()
Provides utility functions for making Gym environments.
"""
import gym
from gym.spaces import Box
import numpy as np
from baselines.common.vec_env import VecEnvWrapper
from baselines.common.atari_wrappers import make_atari, wrap_deepmind
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.common.vec_env.vec_normalize import (VecNormalize
as VecNormalize_)
class TransposeImage(gym.ObservationWrapper):
def __init__(self, env=None):
super(TransposeImage, self).__init__(env)
obs_shape = self.observation_space.shape
self.observation_space = Box(
self.observation_space.low[0, 0, 0],
self.observation_space.high[0, 0, 0],
[obs_shape[2], obs_shape[1], obs_shape[0]],
dtype=self.observation_space.dtype)
def observation(self, observation):
return observation.transpose(2, 0, 1)
class VecFrameStack(VecEnvWrapper):
def __init__(self, venv, nstack):
self.venv = venv
x_t = np.reshape(x_t, [84, 84, 1])
return x_t.astype(np.uint8)
class ImageToPyTorch(gym.ObservationWrapper):
def __init__(self, env):
super(ImageToPyTorch, self).__init__(env)
old_shape = self.observation_space.shape
self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]),
dtype=np.float32)
def observation(self, observation):
return np.moveaxis(observation, 2, 0)
class ScaledFloatFrame(gym.ObservationWrapper):
def observation(self, obs):
return np.array(obs).astype(np.float32) / 255.0
class BufferWrapper(gym.ObservationWrapper):
def __init__(self, env, n_steps, dtype=np.float32):
super(BufferWrapper, self).__init__(env)
self.dtype = dtype
old_space = env.observation_space
self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
old_space.high.repeat(n_steps, axis=0), dtype=dtype)
def reset(self):
self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
return self.observation(self.env.reset())