Source code for rl_zoo3.wrappers

import gym
import numpy as np
from gym import spaces
from sb3_contrib.common.wrappers import TimeFeatureWrapper  # noqa: F401 (backward compatibility)


[docs]class DoneOnSuccessWrapper(gym.Wrapper): """ Reset on success and offsets the reward. Useful for GoalEnv. """ def __init__(self, env: gym.Env, reward_offset: float = 0.0, n_successes: int = 1): super().__init__(env) self.reward_offset = reward_offset self.n_successes = n_successes self.current_successes = 0
[docs] def reset(self): self.current_successes = 0 return self.env.reset()
[docs] def step(self, action): obs, reward, done, info = self.env.step(action) if info.get("is_success", False): self.current_successes += 1 else: self.current_successes = 0 # number of successes in a row done = done or self.current_successes >= self.n_successes reward += self.reward_offset return obs, reward, done, info
def compute_reward(self, achieved_goal, desired_goal, info): reward = self.env.compute_reward(achieved_goal, desired_goal, info) return reward + self.reward_offset
[docs]class ActionNoiseWrapper(gym.Wrapper): """ Add gaussian noise to the action (without telling the agent), to test the robustness of the control. :param env: :param noise_std: Standard deviation of the noise """ def __init__(self, env: gym.Env, noise_std: float = 0.1): super().__init__(env) self.noise_std = noise_std
[docs] def step(self, action): noise = np.random.normal(np.zeros_like(action), np.ones_like(action) * self.noise_std) noisy_action = action + noise return self.env.step(noisy_action)
[docs]class ActionSmoothingWrapper(gym.Wrapper): """ Smooth the action using exponential moving average. :param env: :param smoothing_coef: Smoothing coefficient (0 no smoothing, 1 very smooth) """ def __init__(self, env: gym.Env, smoothing_coef: float = 0.0): super().__init__(env) self.smoothing_coef = smoothing_coef self.smoothed_action = None # from https://github.com/rail-berkeley/softlearning/issues/3 # for smoothing latent space # self.alpha = self.smoothing_coef # self.beta = np.sqrt(1 - self.alpha ** 2) / (1 - self.alpha)
[docs] def reset(self): self.smoothed_action = None return self.env.reset()
[docs] def step(self, action): if self.smoothed_action is None: self.smoothed_action = np.zeros_like(action) self.smoothed_action = self.smoothing_coef * self.smoothed_action + (1 - self.smoothing_coef) * action return self.env.step(self.smoothed_action)
[docs]class DelayedRewardWrapper(gym.Wrapper): """ Delay the reward by `delay` steps, it makes the task harder but more realistic. The reward is accumulated during those steps. :param env: :param delay: Number of steps the reward should be delayed. """ def __init__(self, env: gym.Env, delay: int = 10): super().__init__(env) self.delay = delay self.current_step = 0 self.accumulated_reward = 0.0
[docs] def reset(self): self.current_step = 0 self.accumulated_reward = 0.0 return self.env.reset()
[docs] def step(self, action): obs, reward, done, info = self.env.step(action) self.accumulated_reward += reward self.current_step += 1 if self.current_step % self.delay == 0 or done: reward = self.accumulated_reward self.accumulated_reward = 0.0 else: reward = 0.0 return obs, reward, done, info
[docs]class HistoryWrapper(gym.Wrapper): """ Stack past observations and actions to give an history to the agent. :param env: :param horizon:Number of steps to keep in the history. """ def __init__(self, env: gym.Env, horizon: int = 2): assert isinstance(env.observation_space, spaces.Box) wrapped_obs_space = env.observation_space wrapped_action_space = env.action_space # TODO: double check, it seems wrong when we have different low and highs low_obs = np.repeat(wrapped_obs_space.low, horizon, axis=-1) high_obs = np.repeat(wrapped_obs_space.high, horizon, axis=-1) low_action = np.repeat(wrapped_action_space.low, horizon, axis=-1) high_action = np.repeat(wrapped_action_space.high, horizon, axis=-1) low = np.concatenate((low_obs, low_action)) high = np.concatenate((high_obs, high_action)) # Overwrite the observation space env.observation_space = spaces.Box(low=low, high=high, dtype=wrapped_obs_space.dtype) super().__init__(env) self.horizon = horizon self.low_action, self.high_action = low_action, high_action self.low_obs, self.high_obs = low_obs, high_obs self.low, self.high = low, high self.obs_history = np.zeros(low_obs.shape, low_obs.dtype) self.action_history = np.zeros(low_action.shape, low_action.dtype) def _create_obs_from_history(self): return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self): # Flush the history self.obs_history[...] = 0 self.action_history[...] = 0 obs = self.env.reset() self.obs_history[..., -obs.shape[-1] :] = obs return self._create_obs_from_history()
[docs] def step(self, action): obs, reward, done, info = self.env.step(action) last_ax_size = obs.shape[-1] self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1) self.obs_history[..., -obs.shape[-1] :] = obs self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1) self.action_history[..., -action.shape[-1] :] = action return self._create_obs_from_history(), reward, done, info
[docs]class HistoryWrapperObsDict(gym.Wrapper): """ History Wrapper for dict observation. :param env: :param horizon: Number of steps to keep in the history. """ def __init__(self, env: gym.Env, horizon: int = 2): assert isinstance(env.observation_space.spaces["observation"], spaces.Box) wrapped_obs_space = env.observation_space.spaces["observation"] wrapped_action_space = env.action_space # TODO: double check, it seems wrong when we have different low and highs low_obs = np.repeat(wrapped_obs_space.low, horizon, axis=-1) high_obs = np.repeat(wrapped_obs_space.high, horizon, axis=-1) low_action = np.repeat(wrapped_action_space.low, horizon, axis=-1) high_action = np.repeat(wrapped_action_space.high, horizon, axis=-1) low = np.concatenate((low_obs, low_action)) high = np.concatenate((high_obs, high_action)) # Overwrite the observation space env.observation_space.spaces["observation"] = spaces.Box(low=low, high=high, dtype=wrapped_obs_space.dtype) super().__init__(env) self.horizon = horizon self.low_action, self.high_action = low_action, high_action self.low_obs, self.high_obs = low_obs, high_obs self.low, self.high = low, high self.obs_history = np.zeros(low_obs.shape, low_obs.dtype) self.action_history = np.zeros(low_action.shape, low_action.dtype) def _create_obs_from_history(self): return np.concatenate((self.obs_history, self.action_history))
[docs] def reset(self): # Flush the history self.obs_history[...] = 0 self.action_history[...] = 0 obs_dict = self.env.reset() obs = obs_dict["observation"] self.obs_history[..., -obs.shape[-1] :] = obs obs_dict["observation"] = self._create_obs_from_history() return obs_dict
[docs] def step(self, action): obs_dict, reward, done, info = self.env.step(action) obs = obs_dict["observation"] last_ax_size = obs.shape[-1] self.obs_history = np.roll(self.obs_history, shift=-last_ax_size, axis=-1) self.obs_history[..., -obs.shape[-1] :] = obs self.action_history = np.roll(self.action_history, shift=-action.shape[-1], axis=-1) self.action_history[..., -action.shape[-1] :] = action obs_dict["observation"] = self._create_obs_from_history() return obs_dict, reward, done, info
[docs]class FrameSkip(gym.Wrapper): """ Return only every ``skip``-th frame (frameskipping) :param env: the environment :param skip: number of ``skip``-th frame """ def __init__(self, env: gym.Env, skip: int = 4): super().__init__(env) self._skip = skip
[docs] def step(self, action: np.ndarray): """ Step the environment with the given action Repeat action, sum reward. :param action: the action :return: observation, reward, done, information """ total_reward = 0.0 done = None for _ in range(self._skip): obs, reward, done, info = self.env.step(action) total_reward += reward if done: break return obs, total_reward, done, info
[docs] def reset(self): return self.env.reset()
[docs]class MaskVelocityWrapper(gym.ObservationWrapper): """ Gym environment observation wrapper used to mask velocity terms in observations. The intention is the make the MDP partially observable. Adapted from https://github.com/LiuWenlin595/FinalProject. :param env: Gym environment """ # Supported envs velocity_indices = { "CartPole-v1": np.array([1, 3]), "MountainCar-v0": np.array([1]), "MountainCarContinuous-v0": np.array([1]), "Pendulum-v1": np.array([2]), "LunarLander-v2": np.array([2, 3, 5]), "LunarLanderContinuous-v2": np.array([2, 3, 5]), } def __init__(self, env: gym.Env): super().__init__(env) env_id: str = env.unwrapped.spec.id # By default no masking self.mask = np.ones_like(env.observation_space.sample()) try: # Mask velocity self.mask[self.velocity_indices[env_id]] = 0.0 except KeyError as e: raise NotImplementedError(f"Velocity masking not implemented for {env_id}") from e def observation(self, observation: np.ndarray) -> np.ndarray: return observation * self.mask