from typing import Any, ClassVar, Dict, Optional, SupportsFloat, Tuple
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from gymnasium.core import ObsType
from sb3_contrib.common.wrappers import TimeFeatureWrapper # noqa: F401 (backward compatibility)
from stable_baselines3.common.type_aliases import GymResetReturn, GymStepReturn
[docs]class TruncatedOnSuccessWrapper(gym.Wrapper):
"""
Reset on success and offsets the reward.
Useful for GoalEnv.
"""
def __init__(self, env: gym.Env, reward_offset: float = 0.0, n_successes: int = 1):
super().__init__(env)
self.reward_offset = reward_offset
self.n_successes = n_successes
self.current_successes = 0
[docs] def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> GymResetReturn:
self.current_successes = 0
assert options is None, "Options not supported for now"
return self.env.reset(seed=seed)
[docs] def step(self, action) -> GymStepReturn:
obs, reward, terminated, truncated, info = self.env.step(action)
if info.get("is_success", False):
self.current_successes += 1
else:
self.current_successes = 0
# number of successes in a row
truncated = truncated or self.current_successes >= self.n_successes
reward = float(reward) + self.reward_offset
return obs, reward, terminated, truncated, info
def compute_reward(self, achieved_goal, desired_goal, info):
reward = self.env.compute_reward(achieved_goal, desired_goal, info)
return reward + self.reward_offset
[docs]class ActionNoiseWrapper(gym.Wrapper[ObsType, np.ndarray, ObsType, np.ndarray]):
"""
Add gaussian noise to the action (without telling the agent),
to test the robustness of the control.
:param env:
:param noise_std: Standard deviation of the noise
"""
def __init__(self, env: gym.Env, noise_std: float = 0.1):
super().__init__(env)
self.noise_std = noise_std
[docs] def step(self, action: np.ndarray) -> Tuple[ObsType, SupportsFloat, bool, bool, Dict[str, Any]]:
assert isinstance(self.action_space, spaces.Box)
noise = np.random.normal(np.zeros_like(action), np.ones_like(action) * self.noise_std)
noisy_action = np.clip(action + noise, self.action_space.low, self.action_space.high)
return self.env.step(noisy_action)
[docs]class ActionSmoothingWrapper(gym.Wrapper):
"""
Smooth the action using exponential moving average.
:param env:
:param smoothing_coef: Smoothing coefficient (0 no smoothing, 1 very smooth)
"""
def __init__(self, env: gym.Env, smoothing_coef: float = 0.0):
super().__init__(env)
self.smoothing_coef = smoothing_coef
self.smoothed_action = None
# from https://github.com/rail-berkeley/softlearning/issues/3
# for smoothing latent space
# self.alpha = self.smoothing_coef
# self.beta = np.sqrt(1 - self.alpha ** 2) / (1 - self.alpha)
[docs] def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> GymResetReturn:
self.smoothed_action = None
assert options is None, "Options not supported for now"
return self.env.reset(seed=seed)
[docs] def step(self, action) -> GymStepReturn:
if self.smoothed_action is None:
self.smoothed_action = np.zeros_like(action)
assert self.smoothed_action is not None
self.smoothed_action = self.smoothing_coef * self.smoothed_action + (1 - self.smoothing_coef) * action
return self.env.step(self.smoothed_action)
[docs]class DelayedRewardWrapper(gym.Wrapper):
"""
Delay the reward by `delay` steps, it makes the task harder but more realistic.
The reward is accumulated during those steps.
:param env:
:param delay: Number of steps the reward should be delayed.
"""
def __init__(self, env: gym.Env, delay: int = 10):
super().__init__(env)
self.delay = delay
self.current_step = 0
self.accumulated_reward = 0.0
[docs] def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> GymResetReturn:
self.current_step = 0
self.accumulated_reward = 0.0
assert options is None, "Options not supported for now"
return self.env.reset(seed=seed)
[docs] def step(self, action) -> GymStepReturn:
obs, reward, terminated, truncated, info = self.env.step(action)
self.accumulated_reward += float(reward)
self.current_step += 1
if self.current_step % self.delay == 0 or terminated or truncated:
reward = self.accumulated_reward
self.accumulated_reward = 0.0
else:
reward = 0.0
return obs, reward, terminated, truncated, info
[docs]class HistoryWrapper(gym.Wrapper[np.ndarray, np.ndarray, np.ndarray, np.ndarray]):
"""
Stack past observations and actions to give an history to the agent.
:param env:
:param horizon: Number of steps to keep in the history.
"""
def __init__(self, env: gym.Env, horizon: int = 2):
assert isinstance(env.observation_space, spaces.Box)
assert isinstance(env.action_space, spaces.Box)
wrapped_obs_space = env.observation_space
wrapped_action_space = env.action_space
low_obs = np.tile(wrapped_obs_space.low, horizon)
high_obs = np.tile(wrapped_obs_space.high, horizon)
low_action = np.tile(wrapped_action_space.low, horizon)
high_action = np.tile(wrapped_action_space.high, horizon)
low = np.concatenate((low_obs, low_action))
high = np.concatenate((high_obs, high_action))
# Overwrite the observation space
env.observation_space = spaces.Box(low=low, high=high, dtype=wrapped_obs_space.dtype) # type: ignore[arg-type]
super().__init__(env)
self.horizon = horizon
self.low_action, self.high_action = low_action, high_action
self.low_obs, self.high_obs = low_obs, high_obs
self.low, self.high = low, high
self.obs_history = np.zeros(low_obs.shape, low_obs.dtype)
self.action_history = np.zeros(low_action.shape, low_action.dtype)
def _create_obs_from_history(self) -> np.ndarray:
return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> Tuple[np.ndarray, Dict]:
# Flush the history
self.obs_history[...] = 0
self.action_history[...] = 0
assert options is None, "Options not supported for now"
obs, info = self.env.reset(seed=seed)
self.obs_history[..., -obs.shape[-1] :] = obs
return self._create_obs_from_history(), info
[docs] def step(self, action) -> Tuple[np.ndarray, SupportsFloat, bool, bool, Dict]:
obs, reward, terminated, truncated, info = self.env.step(action)
last_ax_size = obs.shape[-1]
self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1)
self.obs_history[..., -obs.shape[-1] :] = obs
self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1)
self.action_history[..., -action.shape[-1] :] = action
return self._create_obs_from_history(), reward, terminated, truncated, info
[docs]class HistoryWrapperObsDict(gym.Wrapper):
"""
History Wrapper for dict observation.
:param env:
:param horizon: Number of steps to keep in the history.
"""
def __init__(self, env: gym.Env, horizon: int = 2):
assert isinstance(env.observation_space, spaces.Dict)
assert isinstance(env.observation_space.spaces["observation"], spaces.Box)
assert isinstance(env.action_space, spaces.Box)
wrapped_obs_space = env.observation_space.spaces["observation"]
wrapped_action_space = env.action_space
low_obs = np.tile(wrapped_obs_space.low, horizon)
high_obs = np.tile(wrapped_obs_space.high, horizon)
low_action = np.tile(wrapped_action_space.low, horizon)
high_action = np.tile(wrapped_action_space.high, horizon)
low = np.concatenate((low_obs, low_action))
high = np.concatenate((high_obs, high_action))
# Overwrite the observation space
env.observation_space.spaces["observation"] = spaces.Box(
low=low,
high=high,
dtype=wrapped_obs_space.dtype, # type: ignore[arg-type]
)
super().__init__(env)
self.horizon = horizon
self.low_action, self.high_action = low_action, high_action
self.low_obs, self.high_obs = low_obs, high_obs
self.low, self.high = low, high
self.obs_history = np.zeros(low_obs.shape, low_obs.dtype)
self.action_history = np.zeros(low_action.shape, low_action.dtype)
def _create_obs_from_history(self) -> np.ndarray:
return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> Tuple[Dict[str, np.ndarray], Dict]:
# Flush the history
self.obs_history[...] = 0
self.action_history[...] = 0
assert options is None, "Options not supported for now"
obs_dict, info = self.env.reset(seed=seed)
obs = obs_dict["observation"]
self.obs_history[..., -obs.shape[-1] :] = obs
obs_dict["observation"] = self._create_obs_from_history()
return obs_dict, info
[docs] def step(self, action) -> Tuple[Dict[str, np.ndarray], SupportsFloat, bool, bool, Dict]:
obs_dict, reward, terminated, truncated, info = self.env.step(action)
obs = obs_dict["observation"]
last_ax_size = obs.shape[-1]
self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1)
self.obs_history[..., -obs.shape[-1] :] = obs
self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1)
self.action_history[..., -action.shape[-1] :] = action
obs_dict["observation"] = self._create_obs_from_history()
return obs_dict, reward, terminated, truncated, info
[docs]class FrameSkip(gym.Wrapper):
"""
Return only every ``skip``-th frame (frameskipping)
:param env: the environment
:param skip: number of ``skip``-th frame
"""
def __init__(self, env: gym.Env, skip: int = 4):
super().__init__(env)
self._skip = skip
[docs] def step(self, action) -> GymStepReturn:
"""
Step the environment with the given action
Repeat action, sum reward.
:param action: the action
:return: observation, reward, terminated, truncated, information
"""
total_reward = 0.0
for _ in range(self._skip):
obs, reward, terminated, truncated, info = self.env.step(action)
total_reward += float(reward)
if terminated or truncated:
break
return obs, total_reward, terminated, truncated, info
[docs]class MaskVelocityWrapper(gym.ObservationWrapper):
"""
Gym environment observation wrapper used to mask velocity terms in
observations. The intention is the make the MDP partially observable.
Adapted from https://github.com/LiuWenlin595/FinalProject.
:param env: Gym environment
"""
# Supported envs
velocity_indices: ClassVar[Dict[str, np.ndarray]] = {
"CartPole-v1": np.array([1, 3]),
"MountainCar-v0": np.array([1]),
"MountainCarContinuous-v0": np.array([1]),
"Pendulum-v1": np.array([2]),
"LunarLander-v2": np.array([2, 3, 5]),
"LunarLanderContinuous-v2": np.array([2, 3, 5]),
}
def __init__(self, env: gym.Env):
super().__init__(env)
assert env.unwrapped.spec is not None
env_id: str = env.unwrapped.spec.id
# By default no masking
self.mask = np.ones_like(env.observation_space.sample())
try:
# Mask velocity
self.mask[self.velocity_indices[env_id]] = 0.0
except KeyError as e:
raise NotImplementedError(f"Velocity masking not implemented for {env_id}") from e
[docs] def observation(self, observation: np.ndarray) -> np.ndarray:
return observation * self.mask