mirror of
https://github.com/gryf/coach.git
synced 2025-12-17 11:10:20 +01:00
* updating the documentation website * adding the built docs * update of api docstrings across coach and tutorials 0-2 * added some missing api documentation * New Sphinx based documentation
199 lines
7.9 KiB
Python
199 lines
7.9 KiB
Python
#
|
|
# Copyright (c) 2017 Intel Corporation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
|
|
|
|
import random
|
|
from enum import Enum
|
|
from typing import Union
|
|
|
|
import numpy as np
|
|
|
|
try:
|
|
from dm_control import suite
|
|
from dm_control.suite.wrappers import pixels
|
|
except ImportError:
|
|
from rl_coach.logger import failed_imports
|
|
failed_imports.append("DeepMind Control Suite")
|
|
|
|
from rl_coach.base_parameters import VisualizationParameters
|
|
from rl_coach.environments.environment import Environment, EnvironmentParameters, LevelSelection
|
|
from rl_coach.filters.filter import NoInputFilter, NoOutputFilter
|
|
from rl_coach.spaces import BoxActionSpace, ImageObservationSpace, VectorObservationSpace, StateSpace
|
|
|
|
|
|
class ObservationType(Enum):
|
|
Measurements = 1
|
|
Image = 2
|
|
Image_and_Measurements = 3
|
|
|
|
|
|
# Parameters
|
|
class ControlSuiteEnvironmentParameters(EnvironmentParameters):
|
|
def __init__(self, level=None):
|
|
super().__init__(level=level)
|
|
self.observation_type = ObservationType.Measurements
|
|
self.default_input_filter = ControlSuiteInputFilter
|
|
self.default_output_filter = ControlSuiteOutputFilter
|
|
|
|
@property
|
|
def path(self):
|
|
return 'rl_coach.environments.control_suite_environment:ControlSuiteEnvironment'
|
|
|
|
|
|
"""
|
|
ControlSuite Environment Components
|
|
"""
|
|
ControlSuiteInputFilter = NoInputFilter()
|
|
ControlSuiteOutputFilter = NoOutputFilter()
|
|
|
|
control_suite_envs = {':'.join(env): ':'.join(env) for env in suite.BENCHMARKING}
|
|
|
|
|
|
# Environment
|
|
class ControlSuiteEnvironment(Environment):
|
|
def __init__(self, level: LevelSelection, frame_skip: int, visualization_parameters: VisualizationParameters,
|
|
target_success_rate: float=1.0, seed: Union[None, int]=None, human_control: bool=False,
|
|
observation_type: ObservationType=ObservationType.Measurements,
|
|
custom_reward_threshold: Union[int, float]=None, **kwargs):
|
|
"""
|
|
:param level: (str)
|
|
A string representing the control suite level to run. This can also be a LevelSelection object.
|
|
For example, cartpole:swingup.
|
|
|
|
:param frame_skip: (int)
|
|
The number of frames to skip between any two actions given by the agent. The action will be repeated
|
|
for all the skipped frames.
|
|
|
|
:param visualization_parameters: (VisualizationParameters)
|
|
The parameters used for visualizing the environment, such as the render flag, storing videos etc.
|
|
|
|
:param target_success_rate: (float)
|
|
Stop experiment if given target success rate was achieved.
|
|
|
|
:param seed: (int)
|
|
A seed to use for the random number generator when running the environment.
|
|
|
|
:param human_control: (bool)
|
|
A flag that allows controlling the environment using the keyboard keys.
|
|
|
|
:param observation_type: (ObservationType)
|
|
An enum which defines which observation to use. The current options are to use:
|
|
* Measurements only - a vector of joint torques and similar measurements
|
|
* Image only - an image of the environment as seen by a camera attached to the simulator
|
|
* Measurements & Image - both type of observations will be returned in the state using the keys
|
|
'measurements' and 'pixels' respectively.
|
|
|
|
:param custom_reward_threshold: (float)
|
|
Allows defining a custom reward that will be used to decide when the agent succeeded in passing the environment.
|
|
|
|
"""
|
|
super().__init__(level, seed, frame_skip, human_control, custom_reward_threshold, visualization_parameters, target_success_rate)
|
|
|
|
self.observation_type = observation_type
|
|
|
|
# load and initialize environment
|
|
domain_name, task_name = self.env_id.split(":")
|
|
self.env = suite.load(domain_name=domain_name, task_name=task_name, task_kwargs={'random': seed})
|
|
|
|
if observation_type != ObservationType.Measurements:
|
|
self.env = pixels.Wrapper(self.env, pixels_only=observation_type == ObservationType.Image)
|
|
|
|
# seed
|
|
if self.seed is not None:
|
|
np.random.seed(self.seed)
|
|
random.seed(self.seed)
|
|
|
|
self.state_space = StateSpace({})
|
|
|
|
# image observations
|
|
if observation_type != ObservationType.Measurements:
|
|
self.state_space['pixels'] = ImageObservationSpace(shape=self.env.observation_spec()['pixels'].shape,
|
|
high=255)
|
|
|
|
# measurements observations
|
|
if observation_type != ObservationType.Image:
|
|
measurements_space_size = 0
|
|
measurements_names = []
|
|
for observation_space_name, observation_space in self.env.observation_spec().items():
|
|
if len(observation_space.shape) == 0:
|
|
measurements_space_size += 1
|
|
measurements_names.append(observation_space_name)
|
|
elif len(observation_space.shape) == 1:
|
|
measurements_space_size += observation_space.shape[0]
|
|
measurements_names.extend(["{}_{}".format(observation_space_name, i) for i in
|
|
range(observation_space.shape[0])])
|
|
self.state_space['measurements'] = VectorObservationSpace(shape=measurements_space_size,
|
|
measurements_names=measurements_names)
|
|
|
|
# actions
|
|
self.action_space = BoxActionSpace(
|
|
shape=self.env.action_spec().shape[0],
|
|
low=self.env.action_spec().minimum,
|
|
high=self.env.action_spec().maximum
|
|
)
|
|
|
|
# initialize the state by getting a new state from the environment
|
|
self.reset_internal_state(True)
|
|
|
|
# render
|
|
if self.is_rendered:
|
|
image = self.get_rendered_image()
|
|
scale = 1
|
|
if self.human_control:
|
|
scale = 2
|
|
if not self.native_rendering:
|
|
self.renderer.create_screen(image.shape[1]*scale, image.shape[0]*scale)
|
|
|
|
self.target_success_rate = target_success_rate
|
|
|
|
def _update_state(self):
|
|
self.state = {}
|
|
|
|
if self.observation_type != ObservationType.Measurements:
|
|
self.pixels = self.last_result.observation['pixels']
|
|
self.state['pixels'] = self.pixels
|
|
|
|
if self.observation_type != ObservationType.Image:
|
|
self.measurements = np.array([])
|
|
for sub_observation in self.last_result.observation.values():
|
|
if isinstance(sub_observation, np.ndarray) and len(sub_observation.shape) == 1:
|
|
self.measurements = np.concatenate((self.measurements, sub_observation))
|
|
else:
|
|
self.measurements = np.concatenate((self.measurements, np.array([sub_observation])))
|
|
self.state['measurements'] = self.measurements
|
|
|
|
self.reward = self.last_result.reward if self.last_result.reward is not None else 0
|
|
|
|
self.done = self.last_result.last()
|
|
|
|
def _take_action(self, action):
|
|
if type(self.action_space) == BoxActionSpace:
|
|
action = self.action_space.clip_action_to_space(action)
|
|
|
|
self.last_result = self.env.step(action)
|
|
|
|
def _restart_environment_episode(self, force_environment_reset=False):
|
|
self.last_result = self.env.reset()
|
|
|
|
def _render(self):
|
|
pass
|
|
|
|
def get_rendered_image(self):
|
|
return self.env.physics.render(camera_id=0)
|
|
|
|
def get_target_success_rate(self) -> float:
|
|
return self.target_success_rate |