1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 19:20:19 +01:00
Files
coach/environments/environment_wrapper.py
Itai Caspi 125c7ee38d Release 0.9
Main changes are detailed below:

New features -
* CARLA 0.7 simulator integration
* Human control of the game play
* Recording of human game play and storing / loading the replay buffer
* Behavioral cloning agent and presets
* Golden tests for several presets
* Selecting between deep / shallow image embedders
* Rendering through pygame (with some boost in performance)

API changes -
* Improved environment wrapper API
* Added an evaluate flag to allow convenient evaluation of existing checkpoints
* Improve frameskip definition in Gym

Bug fixes -
* Fixed loading of checkpoints for agents with more than one network
* Fixed the N Step Q learning agent python3 compatibility
2017-12-19 19:27:16 +02:00

249 lines
9.4 KiB
Python

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
from utils import *
from configurations import Preset
from renderer import Renderer
import operator
import time
class EnvironmentWrapper(object):
def __init__(self, tuning_parameters):
"""
:param tuning_parameters:
:type tuning_parameters: Preset
"""
# env initialization
self.game = []
self.actions = {}
self.observation = []
self.reward = 0
self.done = False
self.default_action = 0
self.last_action_idx = 0
self.episode_idx = 0
self.last_episode_time = time.time()
self.measurements = []
self.info = []
self.action_space_low = 0
self.action_space_high = 0
self.action_space_abs_range = 0
self.actions_description = {}
self.discrete_controls = True
self.action_space_size = 0
self.key_to_action = {}
self.width = 1
self.height = 1
self.is_state_type_image = True
self.measurements_size = 0
self.phase = RunPhase.TRAIN
self.tp = tuning_parameters
self.record_video_every = self.tp.visualization.record_video_every
self.env_id = self.tp.env.level
self.video_path = self.tp.visualization.video_path
self.is_rendered = self.tp.visualization.render
self.seed = self.tp.seed
self.frame_skip = self.tp.env.frame_skip
self.human_control = self.tp.env.human_control
self.wait_for_explicit_human_action = False
self.is_rendered = self.is_rendered or self.human_control
self.game_is_open = True
self.renderer = Renderer()
def _idx_to_action(self, action_idx):
"""
Convert an action index to one of the environment available actions.
For example, if the available actions are 4,5,6 then this function will map 0->4, 1->5, 2->6
:param action_idx: an action index between 0 and self.action_space_size - 1
:return: the action corresponding to the requested index
"""
return self.actions[action_idx]
def _action_to_idx(self, action):
"""
Convert an environment action to one of the available actions of the wrapper.
For example, if the available actions are 4,5,6 then this function will map 4->0, 5->1, 6->2
:param action: the environment action
:return: an action index between 0 and self.action_space_size - 1, or -1 if the action does not exist
"""
for key, val in self.actions.items():
if val == action:
return key
return -1
def get_action_from_user(self):
"""
Get an action from the user keyboard
:return: action index
"""
if self.wait_for_explicit_human_action:
while len(self.renderer.pressed_keys) == 0:
self.renderer.get_events()
if self.key_to_action == {}:
# the keys are the numbers on the keyboard corresponding to the action index
if len(self.renderer.pressed_keys) > 0:
action_idx = self.renderer.pressed_keys[0] - ord("1")
if 0 <= action_idx < self.action_space_size:
return action_idx
else:
# the keys are mapped through the environment to more intuitive keyboard keys
# key = tuple(self.renderer.pressed_keys)
# for key in self.renderer.pressed_keys:
for env_keys in self.key_to_action.keys():
if set(env_keys) == set(self.renderer.pressed_keys):
return self.key_to_action[env_keys]
# return the default action 0 so that the environment will continue running
return self.default_action
def step(self, action_idx):
"""
Perform a single step on the environment using the given action
:param action_idx: the action to perform on the environment
:return: A dictionary containing the observation, reward, done flag, action and measurements
"""
self.last_action_idx = action_idx
self._take_action(action_idx)
self._update_state()
if self.is_rendered:
self.render()
self.observation = self._preprocess_observation(self.observation)
return {'observation': self.observation,
'reward': self.reward,
'done': self.done,
'action': self.last_action_idx,
'measurements': self.measurements,
'info': self.info}
def render(self):
"""
Call the environment function for rendering to the screen
"""
self.renderer.render_image(self.get_rendered_image())
def reset(self, force_environment_reset=False):
"""
Reset the environment and all the variable of the wrapper
:param force_environment_reset: forces environment reset even when the game did not end
:return: A dictionary containing the observation, reward, done flag, action and measurements
"""
self._restart_environment_episode(force_environment_reset)
self.last_episode_time = time.time()
self.done = False
self.episode_idx += 1
self.reward = 0.0
self.last_action_idx = 0
self._update_state()
# render before the preprocessing of the observation, so that the image will be in its original quality
if self.is_rendered:
self.render()
self.observation = self._preprocess_observation(self.observation)
return {'observation': self.observation,
'reward': self.reward,
'done': self.done,
'action': self.last_action_idx,
'measurements': self.measurements,
'info': self.info}
def get_random_action(self):
"""
Returns an action picked uniformly from the available actions
:return: a numpy array with a random action
"""
if self.discrete_controls:
return np.random.choice(self.action_space_size)
else:
return np.random.uniform(self.action_space_low, self.action_space_high)
def change_phase(self, phase):
"""
Change the current phase of the run.
This is useful when different behavior is expected when testing and training
:param phase: The running phase of the algorithm
:type phase: RunPhase
"""
self.phase = phase
def get_available_keys(self):
"""
Return a list of tuples mapping between action names and the keyboard key that triggers them
:return: a list of tuples mapping between action names and the keyboard key that triggers them
"""
available_keys = []
if self.key_to_action != {}:
for key, idx in sorted(self.key_to_action.items(), key=operator.itemgetter(1)):
if key != ():
key_names = [self.renderer.get_key_names([k])[0] for k in key]
available_keys.append((self.actions_description[idx], ' + '.join(key_names)))
elif self.discrete_controls:
for action in range(self.action_space_size):
available_keys.append(("Action {}".format(action + 1), action + 1))
return available_keys
# The following functions define the interaction with the environment.
# Any new environment that inherits the EnvironmentWrapper class should use these signatures.
# Some of these functions are optional - please read their description for more details.
def _take_action(self, action_idx):
"""
An environment dependent function that sends an action to the simulator.
:param action_idx: the action to perform on the environment
:return: None
"""
pass
def _preprocess_observation(self, observation):
"""
Do initial observation preprocessing such as cropping, rgb2gray, rescale etc.
Implementing this function is optional.
:param observation: a raw observation from the environment
:return: the preprocessed observation
"""
return observation
def _update_state(self):
"""
Updates the state from the environment.
Should update self.observation, self.reward, self.done, self.measurements and self.info
:return: None
"""
pass
def _restart_environment_episode(self, force_environment_reset=False):
"""
:param force_environment_reset: Force the environment to reset even if the episode is not done yet.
:return:
"""
pass
def get_rendered_image(self):
"""
Return a numpy array containing the image that will be rendered to the screen.
This can be different from the observation. For example, mujoco's observation is a measurements vector.
:return: numpy array containing the image that will be rendered to the screen
"""
return self.observation