1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 19:20:19 +01:00
Files
coach/environments/environment_wrapper.py

265 lines
9.6 KiB
Python

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
from utils import *
from configurations import Preset
from renderer import Renderer
import operator
import time
class EnvironmentWrapper(object):
def __init__(self, tuning_parameters):
"""
:param tuning_parameters:
:type tuning_parameters: Preset
"""
# env initialization
self.game = []
self.actions = {}
self.state = []
self.reward = 0
self.done = False
self.default_action = 0
self.last_action_idx = 0
self.episode_idx = 0
self.last_episode_time = time.time()
self.info = []
self.action_space_low = 0
self.action_space_high = 0
self.action_space_abs_range = 0
self.actions_description = {}
self.discrete_controls = True
self.action_space_size = 0
self.key_to_action = {}
self.width = 1
self.height = 1
self.is_state_type_image = True
self.measurements_size = 0
self.phase = RunPhase.TRAIN
self.tp = tuning_parameters
self.record_video_every = self.tp.visualization.record_video_every
self.env_id = self.tp.env.level
self.video_path = self.tp.visualization.video_path
self.is_rendered = self.tp.visualization.render
self.seed = self.tp.seed
self.frame_skip = self.tp.env.frame_skip
self.human_control = self.tp.env.human_control
self.wait_for_explicit_human_action = False
self.is_rendered = self.is_rendered or self.human_control
self.game_is_open = True
self.renderer = Renderer()
@property
def measurements(self):
assert False
@measurements.setter
def measurements(self, value):
assert False
@property
def observation(self):
assert False
@observation.setter
def observation(self, value):
assert False
def _idx_to_action(self, action_idx):
"""
Convert an action index to one of the environment available actions.
For example, if the available actions are 4,5,6 then this function will map 0->4, 1->5, 2->6
:param action_idx: an action index between 0 and self.action_space_size - 1
:return: the action corresponding to the requested index
"""
return self.actions[action_idx]
def _action_to_idx(self, action):
"""
Convert an environment action to one of the available actions of the wrapper.
For example, if the available actions are 4,5,6 then this function will map 4->0, 5->1, 6->2
:param action: the environment action
:return: an action index between 0 and self.action_space_size - 1, or -1 if the action does not exist
"""
for key, val in self.actions.items():
if val == action:
return key
return -1
def get_action_from_user(self):
"""
Get an action from the user keyboard
:return: action index
"""
if self.wait_for_explicit_human_action:
while len(self.renderer.pressed_keys) == 0:
self.renderer.get_events()
if self.key_to_action == {}:
# the keys are the numbers on the keyboard corresponding to the action index
if len(self.renderer.pressed_keys) > 0:
action_idx = self.renderer.pressed_keys[0] - ord("1")
if 0 <= action_idx < self.action_space_size:
return action_idx
else:
# the keys are mapped through the environment to more intuitive keyboard keys
# key = tuple(self.renderer.pressed_keys)
# for key in self.renderer.pressed_keys:
for env_keys in self.key_to_action.keys():
if set(env_keys) == set(self.renderer.pressed_keys):
return self.key_to_action[env_keys]
# return the default action 0 so that the environment will continue running
return self.default_action
def step(self, action_idx):
"""
Perform a single step on the environment using the given action
:param action_idx: the action to perform on the environment
:return: A dictionary containing the state, reward, done flag and action
"""
self.last_action_idx = action_idx
self._take_action(action_idx)
self._update_state()
if self.is_rendered:
self.render()
self.state = self._preprocess_state(self.state)
return {'state': self.state,
'reward': self.reward,
'done': self.done,
'action': self.last_action_idx,
'info': self.info}
def render(self):
"""
Call the environment function for rendering to the screen
"""
self.renderer.render_image(self.get_rendered_image())
def reset(self, force_environment_reset=False):
"""
Reset the environment and all the variable of the wrapper
:param force_environment_reset: forces environment reset even when the game did not end
:return: A dictionary containing the state, reward, done flag and action
"""
self._restart_environment_episode(force_environment_reset)
self.last_episode_time = time.time()
self.done = False
self.episode_idx += 1
self.reward = 0.0
self.last_action_idx = 0
self._update_state()
# render before the preprocessing of the state, so that the image will be in its original quality
if self.is_rendered:
self.render()
# TODO BUG: if the environment has not been reset, _preprocessed_state will be running on an already preprocessed state
# TODO: see also _update_state above
self.state = self._preprocess_state(self.state)
return {'state': self.state,
'reward': self.reward,
'done': self.done,
'action': self.last_action_idx,
'info': self.info}
def get_random_action(self):
"""
Returns an action picked uniformly from the available actions
:return: a numpy array with a random action
"""
if self.discrete_controls:
return np.random.choice(self.action_space_size)
else:
return np.random.uniform(self.action_space_low, self.action_space_high)
def change_phase(self, phase):
"""
Change the current phase of the run.
This is useful when different behavior is expected when testing and training
:param phase: The running phase of the algorithm
:type phase: RunPhase
"""
self.phase = phase
def get_available_keys(self):
"""
Return a list of tuples mapping between action names and the keyboard key that triggers them
:return: a list of tuples mapping between action names and the keyboard key that triggers them
"""
available_keys = []
if self.key_to_action != {}:
for key, idx in sorted(self.key_to_action.items(), key=operator.itemgetter(1)):
if key != ():
key_names = [self.renderer.get_key_names([k])[0] for k in key]
available_keys.append((self.actions_description[idx], ' + '.join(key_names)))
elif self.discrete_controls:
for action in range(self.action_space_size):
available_keys.append(("Action {}".format(action + 1), action + 1))
return available_keys
# The following functions define the interaction with the environment.
# Any new environment that inherits the EnvironmentWrapper class should use these signatures.
# Some of these functions are optional - please read their description for more details.
def _take_action(self, action_idx):
"""
An environment dependent function that sends an action to the simulator.
:param action_idx: the action to perform on the environment
:return: None
"""
pass
def _preprocess_state(self, state):
"""
Do initial state preprocessing such as cropping, rgb2gray, rescale etc.
Implementing this function is optional.
:param state: a raw state from the environment
:return: the preprocessed state
"""
return state
def _update_state(self):
"""
Updates the state from the environment.
Should update self.state, self.reward, self.done and self.info
:return: None
"""
pass
def _restart_environment_episode(self, force_environment_reset=False):
"""
:param force_environment_reset: Force the environment to reset even if the episode is not done yet.
:return:
"""
pass
def get_rendered_image(self):
"""
Return a numpy array containing the image that will be rendered to the screen.
This can be different from the state. For example, mujoco's state is a measurements vector.
:return: numpy array containing the image that will be rendered to the screen
"""
return self.state['observation']