import gym
import numpy as np
from gym import spaces
from sb3_contrib.common.wrappers import TimeFeatureWrapper # noqa: F401 (backward compatibility)
[docs]class DoneOnSuccessWrapper(gym.Wrapper):
"""
Reset on success and offsets the reward.
Useful for GoalEnv.
"""
def __init__(self, env: gym.Env, reward_offset: float = 0.0, n_successes: int = 1):
super().__init__(env)
self.reward_offset = reward_offset
self.n_successes = n_successes
self.current_successes = 0
[docs] def reset(self):
self.current_successes = 0
return self.env.reset()
[docs] def step(self, action):
obs, reward, done, info = self.env.step(action)
if info.get("is_success", False):
self.current_successes += 1
else:
self.current_successes = 0
# number of successes in a row
done = done or self.current_successes >= self.n_successes
reward += self.reward_offset
return obs, reward, done, info
def compute_reward(self, achieved_goal, desired_goal, info):
reward = self.env.compute_reward(achieved_goal, desired_goal, info)
return reward + self.reward_offset
[docs]class ActionNoiseWrapper(gym.Wrapper):
"""
Add gaussian noise to the action (without telling the agent),
to test the robustness of the control.
:param env:
:param noise_std: Standard deviation of the noise
"""
def __init__(self, env: gym.Env, noise_std: float = 0.1):
super().__init__(env)
self.noise_std = noise_std
[docs] def step(self, action):
noise = np.random.normal(np.zeros_like(action), np.ones_like(action) * self.noise_std)
noisy_action = action + noise
return self.env.step(noisy_action)
[docs]class ActionSmoothingWrapper(gym.Wrapper):
"""
Smooth the action using exponential moving average.
:param env:
:param smoothing_coef: Smoothing coefficient (0 no smoothing, 1 very smooth)
"""
def __init__(self, env: gym.Env, smoothing_coef: float = 0.0):
super().__init__(env)
self.smoothing_coef = smoothing_coef
self.smoothed_action = None
# from https://github.com/rail-berkeley/softlearning/issues/3
# for smoothing latent space
# self.alpha = self.smoothing_coef
# self.beta = np.sqrt(1 - self.alpha ** 2) / (1 - self.alpha)
[docs] def reset(self):
self.smoothed_action = None
return self.env.reset()
[docs] def step(self, action):
if self.smoothed_action is None:
self.smoothed_action = np.zeros_like(action)
self.smoothed_action = self.smoothing_coef * self.smoothed_action + (1 - self.smoothing_coef) * action
return self.env.step(self.smoothed_action)
[docs]class DelayedRewardWrapper(gym.Wrapper):
"""
Delay the reward by `delay` steps, it makes the task harder but more realistic.
The reward is accumulated during those steps.
:param env:
:param delay: Number of steps the reward should be delayed.
"""
def __init__(self, env: gym.Env, delay: int = 10):
super().__init__(env)
self.delay = delay
self.current_step = 0
self.accumulated_reward = 0.0
[docs] def reset(self):
self.current_step = 0
self.accumulated_reward = 0.0
return self.env.reset()
[docs] def step(self, action):
obs, reward, done, info = self.env.step(action)
self.accumulated_reward += reward
self.current_step += 1
if self.current_step % self.delay == 0 or done:
reward = self.accumulated_reward
self.accumulated_reward = 0.0
else:
reward = 0.0
return obs, reward, done, info
[docs]class HistoryWrapper(gym.Wrapper):
"""
Stack past observations and actions to give an history to the agent.
:param env:
:param horizon:Number of steps to keep in the history.
"""
def __init__(self, env: gym.Env, horizon: int = 2):
assert isinstance(env.observation_space, spaces.Box)
wrapped_obs_space = env.observation_space
wrapped_action_space = env.action_space
# TODO: double check, it seems wrong when we have different low and highs
low_obs = np.repeat(wrapped_obs_space.low, horizon, axis=-1)
high_obs = np.repeat(wrapped_obs_space.high, horizon, axis=-1)
low_action = np.repeat(wrapped_action_space.low, horizon, axis=-1)
high_action = np.repeat(wrapped_action_space.high, horizon, axis=-1)
low = np.concatenate((low_obs, low_action))
high = np.concatenate((high_obs, high_action))
# Overwrite the observation space
env.observation_space = spaces.Box(low=low, high=high, dtype=wrapped_obs_space.dtype)
super().__init__(env)
self.horizon = horizon
self.low_action, self.high_action = low_action, high_action
self.low_obs, self.high_obs = low_obs, high_obs
self.low, self.high = low, high
self.obs_history = np.zeros(low_obs.shape, low_obs.dtype)
self.action_history = np.zeros(low_action.shape, low_action.dtype)
def _create_obs_from_history(self):
return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self):
# Flush the history
self.obs_history[...] = 0
self.action_history[...] = 0
obs = self.env.reset()
self.obs_history[..., -obs.shape[-1] :] = obs
return self._create_obs_from_history()
[docs] def step(self, action):
obs, reward, done, info = self.env.step(action)
last_ax_size = obs.shape[-1]
self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1)
self.obs_history[..., -obs.shape[-1] :] = obs
self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1)
self.action_history[..., -action.shape[-1] :] = action
return self._create_obs_from_history(), reward, done, info
[docs]class HistoryWrapperObsDict(gym.Wrapper):
"""
History Wrapper for dict observation.
:param env:
:param horizon: Number of steps to keep in the history.
"""
def __init__(self, env: gym.Env, horizon: int = 2):
assert isinstance(env.observation_space.spaces["observation"], spaces.Box)
wrapped_obs_space = env.observation_space.spaces["observation"]
wrapped_action_space = env.action_space
# TODO: double check, it seems wrong when we have different low and highs
low_obs = np.repeat(wrapped_obs_space.low, horizon, axis=-1)
high_obs = np.repeat(wrapped_obs_space.high, horizon, axis=-1)
low_action = np.repeat(wrapped_action_space.low, horizon, axis=-1)
high_action = np.repeat(wrapped_action_space.high, horizon, axis=-1)
low = np.concatenate((low_obs, low_action))
high = np.concatenate((high_obs, high_action))
# Overwrite the observation space
env.observation_space.spaces["observation"] = spaces.Box(low=low, high=high, dtype=wrapped_obs_space.dtype)
super().__init__(env)
self.horizon = horizon
self.low_action, self.high_action = low_action, high_action
self.low_obs, self.high_obs = low_obs, high_obs
self.low, self.high = low, high
self.obs_history = np.zeros(low_obs.shape, low_obs.dtype)
self.action_history = np.zeros(low_action.shape, low_action.dtype)
def _create_obs_from_history(self):
return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self):
# Flush the history
self.obs_history[...] = 0
self.action_history[...] = 0
obs_dict = self.env.reset()
obs = obs_dict["observation"]
self.obs_history[..., -obs.shape[-1] :] = obs
obs_dict["observation"] = self._create_obs_from_history()
return obs_dict
[docs] def step(self, action):
obs_dict, reward, done, info = self.env.step(action)
obs = obs_dict["observation"]
last_ax_size = obs.shape[-1]
self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1)
self.obs_history[..., -obs.shape[-1] :] = obs
self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1)
self.action_history[..., -action.shape[-1] :] = action
obs_dict["observation"] = self._create_obs_from_history()
return obs_dict, reward, done, info
[docs]class FrameSkip(gym.Wrapper):
"""
Return only every ``skip``-th frame (frameskipping)
:param env: the environment
:param skip: number of ``skip``-th frame
"""
def __init__(self, env: gym.Env, skip: int = 4):
super().__init__(env)
self._skip = skip
[docs] def step(self, action: np.ndarray):
"""
Step the environment with the given action
Repeat action, sum reward.
:param action: the action
:return: observation, reward, done, information
"""
total_reward = 0.0
done = None
for _ in range(self._skip):
obs, reward, done, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, info
[docs] def reset(self):
return self.env.reset()
[docs]class MaskVelocityWrapper(gym.ObservationWrapper):
"""
Gym environment observation wrapper used to mask velocity terms in
observations. The intention is the make the MDP partially observable.
Adapted from https://github.com/LiuWenlin595/FinalProject.
:param env: Gym environment
"""
# Supported envs
velocity_indices = {
"CartPole-v1": np.array([1, 3]),
"MountainCar-v0": np.array([1]),
"MountainCarContinuous-v0": np.array([1]),
"Pendulum-v1": np.array([2]),
"LunarLander-v2": np.array([2, 3, 5]),
"LunarLanderContinuous-v2": np.array([2, 3, 5]),
}
def __init__(self, env: gym.Env):
super().__init__(env)
env_id: str = env.unwrapped.spec.id
# By default no masking
self.mask = np.ones_like(env.observation_space.sample())
try:
# Mask velocity
self.mask[self.velocity_indices[env_id]] = 0.0
except KeyError as e:
raise NotImplementedError(f"Velocity masking not implemented for {env_id}") from e
def observation(self, observation: np.ndarray) -> np.ndarray:
return observation * self.mask