# # Copyright (c) 2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import random from enum import Enum from typing import Union import numpy as np try: from dm_control import suite from dm_control.suite.wrappers import pixels except ImportError: from rl_coach.logger import failed_imports failed_imports.append("DeepMind Control Suite") from rl_coach.base_parameters import VisualizationParameters from rl_coach.environments.environment import Environment, EnvironmentParameters, LevelSelection from rl_coach.filters.filter import NoInputFilter, NoOutputFilter from rl_coach.spaces import BoxActionSpace, ImageObservationSpace, VectorObservationSpace, StateSpace class ObservationType(Enum): Measurements = 1 Image = 2 Image_and_Measurements = 3 # Parameters class ControlSuiteEnvironmentParameters(EnvironmentParameters): def __init__(self, level=None): super().__init__(level=level) self.observation_type = ObservationType.Measurements self.default_input_filter = ControlSuiteInputFilter self.default_output_filter = ControlSuiteOutputFilter @property def path(self): return 'rl_coach.environments.control_suite_environment:ControlSuiteEnvironment' """ ControlSuite Environment Components """ ControlSuiteInputFilter = NoInputFilter() ControlSuiteOutputFilter = NoOutputFilter() control_suite_envs = {':'.join(env): ':'.join(env) for env in suite.BENCHMARKING} # Environment class ControlSuiteEnvironment(Environment): def __init__(self, level: LevelSelection, frame_skip: int, visualization_parameters: VisualizationParameters, target_success_rate: float=1.0, seed: Union[None, int]=None, human_control: bool=False, observation_type: ObservationType=ObservationType.Measurements, custom_reward_threshold: Union[int, float]=None, **kwargs): """ :param level: (str) A string representing the control suite level to run. This can also be a LevelSelection object. For example, cartpole:swingup. :param frame_skip: (int) The number of frames to skip between any two actions given by the agent. The action will be repeated for all the skipped frames. :param visualization_parameters: (VisualizationParameters) The parameters used for visualizing the environment, such as the render flag, storing videos etc. :param target_success_rate: (float) Stop experiment if given target success rate was achieved. :param seed: (int) A seed to use for the random number generator when running the environment. :param human_control: (bool) A flag that allows controlling the environment using the keyboard keys. :param observation_type: (ObservationType) An enum which defines which observation to use. The current options are to use: * Measurements only - a vector of joint torques and similar measurements * Image only - an image of the environment as seen by a camera attached to the simulator * Measurements & Image - both type of observations will be returned in the state using the keys 'measurements' and 'pixels' respectively. :param custom_reward_threshold: (float) Allows defining a custom reward that will be used to decide when the agent succeeded in passing the environment. """ super().__init__(level, seed, frame_skip, human_control, custom_reward_threshold, visualization_parameters, target_success_rate) self.observation_type = observation_type # load and initialize environment domain_name, task_name = self.env_id.split(":") self.env = suite.load(domain_name=domain_name, task_name=task_name, task_kwargs={'random': seed}) if observation_type != ObservationType.Measurements: self.env = pixels.Wrapper(self.env, pixels_only=observation_type == ObservationType.Image) # seed if self.seed is not None: np.random.seed(self.seed) random.seed(self.seed) self.state_space = StateSpace({}) # image observations if observation_type != ObservationType.Measurements: self.state_space['pixels'] = ImageObservationSpace(shape=self.env.observation_spec()['pixels'].shape, high=255) # measurements observations if observation_type != ObservationType.Image: measurements_space_size = 0 measurements_names = [] for observation_space_name, observation_space in self.env.observation_spec().items(): if len(observation_space.shape) == 0: measurements_space_size += 1 measurements_names.append(observation_space_name) elif len(observation_space.shape) == 1: measurements_space_size += observation_space.shape[0] measurements_names.extend(["{}_{}".format(observation_space_name, i) for i in range(observation_space.shape[0])]) self.state_space['measurements'] = VectorObservationSpace(shape=measurements_space_size, measurements_names=measurements_names) # actions self.action_space = BoxActionSpace( shape=self.env.action_spec().shape[0], low=self.env.action_spec().minimum, high=self.env.action_spec().maximum ) # initialize the state by getting a new state from the environment self.reset_internal_state(True) # render if self.is_rendered: image = self.get_rendered_image() scale = 1 if self.human_control: scale = 2 if not self.native_rendering: self.renderer.create_screen(image.shape[1]*scale, image.shape[0]*scale) self.target_success_rate = target_success_rate def _update_state(self): self.state = {} if self.observation_type != ObservationType.Measurements: self.pixels = self.last_result.observation['pixels'] self.state['pixels'] = self.pixels if self.observation_type != ObservationType.Image: self.measurements = np.array([]) for sub_observation in self.last_result.observation.values(): if isinstance(sub_observation, np.ndarray) and len(sub_observation.shape) == 1: self.measurements = np.concatenate((self.measurements, sub_observation)) else: self.measurements = np.concatenate((self.measurements, np.array([sub_observation]))) self.state['measurements'] = self.measurements self.reward = self.last_result.reward if self.last_result.reward is not None else 0 self.done = self.last_result.last() def _take_action(self, action): if type(self.action_space) == BoxActionSpace: action = self.action_space.clip_action_to_space(action) self.last_result = self.env.step(action) def _restart_environment_episode(self, force_environment_reset=False): self.last_result = self.env.reset() def _render(self): pass def get_rendered_image(self): return self.env.physics.render(camera_id=0) def get_target_success_rate(self) -> float: return self.target_success_rate