1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 11:10:20 +01:00
Files
coach/rl_coach/graph_managers/batch_rl_graph_manager.py
2019-10-22 11:54:14 +03:00

274 lines
15 KiB
Python

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from copy import deepcopy
from typing import Tuple, List, Union
from rl_coach.agents.dqn_agent import DQNAgentParameters
from rl_coach.agents.nec_agent import NECAgentParameters
from rl_coach.architectures.network_wrapper import NetworkWrapper
from rl_coach.base_parameters import AgentParameters, VisualizationParameters, TaskParameters, \
PresetValidationParameters
from rl_coach.core_types import RunPhase, TotalStepsCounter, TrainingSteps, EnvironmentEpisodes, EnvironmentSteps
from rl_coach.environments.environment import EnvironmentParameters, Environment
from rl_coach.graph_managers.graph_manager import ScheduleParameters
from rl_coach.graph_managers.basic_rl_graph_manager import BasicRLGraphManager
from rl_coach.level_manager import LevelManager
from rl_coach.logger import screen
from rl_coach.schedules import LinearSchedule
from rl_coach.spaces import SpacesDefinition
from rl_coach.utils import short_dynamic_import
from rl_coach.memories.episodic import EpisodicExperienceReplayParameters
from rl_coach.core_types import TimeTypes
class BatchRLGraphManager(BasicRLGraphManager):
"""
A batch RL graph manager creates a scenario of learning from a dataset without a simulator.
If an environment is given (useful either for research purposes, or for experimenting with a toy problem before
actually working with a real dataset), we can use it in order to collect a dataset to later be used to train the
actual agent. The collected dataset, in this case, can be collected either by randomly acting in the environment
(only running in heatup), or alternatively by training a different agent in the environment and using its collected
data as a dataset. If an experience generating agent parameters are given, we will instantiate this agent and use it
in order to train on the environment and then use this dataset to actually train an agent. Otherwise, we will
collect a random dataset.
:param agent_params: the parameters of the agent to train using batch RL
:param env_params: [optional] environment parameters, for cases where we want to first collect a dataset
:param vis_params: visualization parameters
:param preset_validation_params: preset validation parameters, to be used for testing purposes
:param name: graph name
:param spaces_definition: when working with a dataset, we need to get a description of the actual state and action
spaces of the problem
:param reward_model_num_epochs: the number of epochs to go over the dataset for training a reward model for the
'direct method' and 'doubly robust' OPE methods.
:param train_to_eval_ratio: percentage of the data transitions to be used for training vs. evaluation. i.e. a value
of 0.8 means ~80% of the transitions will be used for training and ~20% will be used for
evaluation using OPE.
:param experience_generating_agent_params: [optional] parameters of an agent to be trained vs. an environment, whose
his collected experience will be used to train the acutal (another) agent
:param experience_generating_schedule_params: [optional] graph scheduling parameters for training the experience
generating agent
"""
def __init__(self, agent_params: AgentParameters,
env_params: Union[EnvironmentParameters, None],
schedule_params: ScheduleParameters,
vis_params: VisualizationParameters = VisualizationParameters(),
preset_validation_params: PresetValidationParameters = PresetValidationParameters(),
name='batch_rl_graph', spaces_definition: SpacesDefinition = None, reward_model_num_epochs: int = 100,
train_to_eval_ratio: float = 0.8, experience_generating_agent_params: AgentParameters = None,
experience_generating_schedule_params: ScheduleParameters = None):
super().__init__(agent_params, env_params, schedule_params, vis_params, preset_validation_params, name)
self.is_batch_rl = True
self.time_metric = TimeTypes.Epoch
self.reward_model_num_epochs = reward_model_num_epochs
self.spaces_definition = spaces_definition
self.is_collecting_random_dataset = experience_generating_agent_params is None
# setting this here to make sure that, by default, train_to_eval_ratio gets a value < 1
# (its default value in the memory is 1, so not to affect other non-batch-rl scenarios)
if self.is_collecting_random_dataset:
self.agent_params.memory.train_to_eval_ratio = train_to_eval_ratio
else:
experience_generating_agent_params.memory.train_to_eval_ratio = train_to_eval_ratio
self.experience_generating_agent_params = experience_generating_agent_params
self.experience_generating_agent = None
self.set_schedule_params(experience_generating_schedule_params)
self.schedule_params = schedule_params
def _create_graph(self, task_parameters: TaskParameters) -> Tuple[List[LevelManager], List[Environment]]:
if self.env_params:
# environment loading
self.env_params.seed = task_parameters.seed
self.env_params.experiment_path = task_parameters.experiment_path
env = short_dynamic_import(self.env_params.path)(**self.env_params.__dict__,
visualization_parameters=self.visualization_parameters)
else:
env = None
# Only DQN variants and NEC are supported at this point.
assert(isinstance(self.agent_params, DQNAgentParameters) or isinstance(self.agent_params, NECAgentParameters))
# Only Episodic memories are supported,
# for evaluating the sequential doubly robust estimator
assert(isinstance(self.agent_params.memory, EpisodicExperienceReplayParameters))
# agent loading
self.agent_params.task_parameters = task_parameters # TODO: this should probably be passed in a different way
self.agent_params.name = "agent"
self.agent_params.is_batch_rl_training = True
self.agent_params.network_wrappers['main'].should_get_softmax_probabilities = True
if 'reward_model' not in self.agent_params.network_wrappers:
# user hasn't defined params for the reward model. we will use the same params as used for the 'main'
# network.
self.agent_params.network_wrappers['reward_model'] = deepcopy(self.agent_params.network_wrappers['main'])
self.agent = short_dynamic_import(self.agent_params.path)(self.agent_params)
agents = {'agent': self.agent}
if not self.is_collecting_random_dataset:
self.experience_generating_agent_params.visualization.dump_csv = False
self.experience_generating_agent_params.task_parameters = task_parameters
self.experience_generating_agent_params.name = "experience_gen_agent"
self.experience_generating_agent_params.network_wrappers['main'].should_get_softmax_probabilities = True
# we need to set these manually as these are usually being set for us only for the default agent
self.experience_generating_agent_params.input_filter = self.agent_params.input_filter
self.experience_generating_agent_params.output_filter = self.agent_params.output_filter
self.experience_generating_agent = short_dynamic_import(
self.experience_generating_agent_params.path)(self.experience_generating_agent_params)
agents['experience_generating_agent'] = self.experience_generating_agent
if not env and not self.agent_params.memory.load_memory_from_file_path:
screen.warning("A BatchRLGraph requires setting a dataset to load into the agent's memory or alternatively "
"using an environment to create a (random) dataset from. This agent should only be used for "
"inference. ")
# set level manager
# - although we will be using each agent separately, we have to have both agents initialized together with the
# LevelManager, so to have them both properly initialized
level_manager = LevelManager(agents=agents,
environment=env, name="main_level",
spaces_definition=self.spaces_definition)
if env:
return [level_manager], [env]
else:
return [level_manager], []
def improve(self):
"""
The main loop of the run.
Defined in the following steps:
1. Heatup
2. Repeat:
2.1. Repeat:
2.1.1. Train
2.1.2. Possibly save checkpoint
2.2. Evaluate
:return: None
"""
self.verify_graph_was_created()
# initialize the network parameters from the global network
self.sync()
# If we have both an environment and a dataset to load from, we will use the environment only for
# evaluating the policy, and will not run heatup. If no dataset is available to load from, we will be collecting
# a dataset from an environment.
if not self.agent_params.memory.load_memory_from_file_path:
if self.is_collecting_random_dataset:
# heatup
if self.env_params is not None:
screen.log_title(
"Collecting random-action experience to use for training the actual agent in a Batch RL "
"fashion")
# Creating a random dataset during the heatup phase is useful mainly for tutorial and debug
# purposes.
self.heatup(self.heatup_steps)
else:
screen.log_title(
"Starting to improve an agent collecting experience to use for training the actual agent in a "
"Batch RL fashion")
# set the experience generating agent to train
self.level_managers[0].agents = {'experience_generating_agent': self.experience_generating_agent}
# collect a dataset using the experience generating agent
super().improve()
# set the acquired experience to the actual agent that we're going to train
self.agent.memory = self.experience_generating_agent.memory
# switch the graph scheduling parameters
self.set_schedule_params(self.schedule_params)
# set the actual agent to train
self.level_managers[0].agents = {'agent': self.agent}
# this agent never actually plays
self.level_managers[0].agents['agent'].ap.algorithm.num_consecutive_playing_steps = EnvironmentSteps(0)
# from this point onwards, the dataset cannot be changed anymore. Allows for performance improvements.
self.level_managers[0].agents['agent'].freeze_memory()
self.initialize_ope_models_and_stats()
# improve
if self.task_parameters.task_index is not None:
screen.log_title("Starting to improve {} task index {}".format(self.name, self.task_parameters.task_index))
else:
screen.log_title("Starting to improve {}".format(self.name))
# the outer most training loop
improve_steps_end = self.total_steps_counters[RunPhase.TRAIN] + self.improve_steps
while self.total_steps_counters[RunPhase.TRAIN] < improve_steps_end:
# perform several steps of training
if self.steps_between_evaluation_periods.num_steps > 0:
with self.phase_context(RunPhase.TRAIN):
self.reset_internal_state(force_environment_reset=True)
steps_between_evaluation_periods_end = self.current_step_counter + \
self.steps_between_evaluation_periods
while self.current_step_counter < steps_between_evaluation_periods_end:
self.train()
# the output of batch RL training is always a checkpoint of the trained agent. we always save a checkpoint,
# each epoch, regardless of the user's command line arguments.
self.save_checkpoint()
# run off-policy evaluation estimators to evaluate the agent's performance against the dataset
self.run_off_policy_evaluation()
if self.env_params is not None and self.evaluate(self.evaluation_steps):
# if we do have a simulator (although we are in a batch RL setting we might have a simulator, e.g. when
# demonstrating the batch RL use-case using one of the existing Coach environments),
# we might want to evaluate vs. the simulator every now and then.
break
def initialize_ope_models_and_stats(self):
"""
Improve a reward model of the MDP, to be used for some of the off-policy evaluation (OPE) methods.
e.g. 'direct method' and 'doubly robust'.
"""
agent = self.level_managers[0].agents['agent']
# prepare dataset to be consumed in the expected formats for OPE
agent.memory.prepare_evaluation_dataset()
screen.log_title("Training a regression model for estimating MDP rewards")
agent.improve_reward_model(epochs=self.reward_model_num_epochs)
screen.log_title("Collecting static statistics for OPE")
agent.ope_manager.gather_static_shared_stats(evaluation_dataset_as_transitions=
agent.memory.evaluation_dataset_as_transitions,
batch_size=agent.ap.network_wrappers['main'].batch_size,
reward_model=agent.networks['reward_model'].online_network,
network_keys=list(agent.ap.network_wrappers['main'].
input_embedders_parameters.keys()))
def run_off_policy_evaluation(self):
"""
Run off-policy evaluation estimators to evaluate the trained policy performance against the dataset
:return:
"""
self.level_managers[0].agents['agent'].run_off_policy_evaluation()