mirror of
https://github.com/gryf/coach.git
synced 2025-12-17 19:20:19 +01:00
205 lines
9.9 KiB
Python
205 lines
9.9 KiB
Python
#
|
|
# Copyright (c) 2017 Intel Corporation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
from collections import OrderedDict
|
|
from typing import Union, List
|
|
|
|
import numpy as np
|
|
|
|
from rl_coach.agents.agent import Agent
|
|
from rl_coach.core_types import ActionInfo, StateType, Batch
|
|
from rl_coach.filters.filter import NoInputFilter
|
|
from rl_coach.logger import screen
|
|
from rl_coach.memories.non_episodic.prioritized_experience_replay import PrioritizedExperienceReplay
|
|
from rl_coach.spaces import DiscreteActionSpace
|
|
from copy import deepcopy, copy
|
|
|
|
|
|
## This is an abstract agent - there is no learn_from_batch method ##
|
|
|
|
|
|
class ValueOptimizationAgent(Agent):
|
|
def __init__(self, agent_parameters, parent: Union['LevelManager', 'CompositeAgent']=None):
|
|
super().__init__(agent_parameters, parent)
|
|
self.q_values = self.register_signal("Q")
|
|
self.q_value_for_action = {}
|
|
|
|
# currently we use softmax action probabilities only in batch-rl,
|
|
# but we might want to extend this later at some point.
|
|
self.should_get_softmax_probabilities = \
|
|
hasattr(self.ap.network_wrappers['main'], 'should_get_softmax_probabilities') and \
|
|
self.ap.network_wrappers['main'].should_get_softmax_probabilities
|
|
|
|
def init_environment_dependent_modules(self):
|
|
super().init_environment_dependent_modules()
|
|
if isinstance(self.spaces.action, DiscreteActionSpace):
|
|
for i in range(len(self.spaces.action.actions)):
|
|
self.q_value_for_action[i] = self.register_signal("Q for action {}".format(i),
|
|
dump_one_value_per_episode=False,
|
|
dump_one_value_per_step=True)
|
|
|
|
# Algorithms for which q_values are calculated from predictions will override this function
|
|
def get_all_q_values_for_states(self, states: StateType):
|
|
actions_q_values = None
|
|
if self.exploration_policy.requires_action_values():
|
|
actions_q_values = self.get_prediction(states)
|
|
|
|
return actions_q_values
|
|
|
|
def get_all_q_values_for_states_and_softmax_probabilities(self, states: StateType):
|
|
actions_q_values, softmax_probabilities = None, None
|
|
if self.exploration_policy.requires_action_values():
|
|
outputs = copy(self.networks['main'].online_network.outputs)
|
|
outputs.append(self.networks['main'].online_network.output_heads[0].softmax)
|
|
|
|
actions_q_values, softmax_probabilities = self.get_prediction(states, outputs=outputs)
|
|
return actions_q_values, softmax_probabilities
|
|
|
|
def get_prediction(self, states, outputs=None):
|
|
return self.networks['main'].online_network.predict(self.prepare_batch_for_inference(states, 'main'),
|
|
outputs=outputs)
|
|
|
|
def update_transition_priorities_and_get_weights(self, TD_errors, batch):
|
|
# update errors in prioritized replay buffer
|
|
importance_weights = None
|
|
if isinstance(self.memory, PrioritizedExperienceReplay):
|
|
self.call_memory('update_priorities', (batch.info('idx'), TD_errors))
|
|
importance_weights = batch.info('weight')
|
|
return importance_weights
|
|
|
|
def _validate_action(self, policy, action):
|
|
if np.array(action).shape != ():
|
|
raise ValueError((
|
|
'The exploration_policy {} returned a vector of actions '
|
|
'instead of a single action. ValueOptimizationAgents '
|
|
'require exploration policies which return a single action.'
|
|
).format(policy.__class__.__name__))
|
|
|
|
def choose_action(self, curr_state):
|
|
if self.should_get_softmax_probabilities:
|
|
actions_q_values, softmax_probabilities = \
|
|
self.get_all_q_values_for_states_and_softmax_probabilities(curr_state)
|
|
else:
|
|
actions_q_values = self.get_all_q_values_for_states(curr_state)
|
|
|
|
# choose action according to the exploration policy and the current phase (evaluating or training the agent)
|
|
action, action_probabilities = self.exploration_policy.get_action(actions_q_values)
|
|
if self.should_get_softmax_probabilities and softmax_probabilities is not None:
|
|
# override the exploration policy's generated probabilities when an action was taken
|
|
# with the agent's actual policy
|
|
action_probabilities = softmax_probabilities
|
|
|
|
self._validate_action(self.exploration_policy, action)
|
|
|
|
if actions_q_values is not None:
|
|
# this is for bootstrapped dqn
|
|
if type(actions_q_values) == list and len(actions_q_values) > 0:
|
|
actions_q_values = self.exploration_policy.last_action_values
|
|
|
|
# store the q values statistics for logging
|
|
self.q_values.add_sample(actions_q_values)
|
|
|
|
actions_q_values = actions_q_values.squeeze()
|
|
action_probabilities = action_probabilities.squeeze()
|
|
|
|
for i, q_value in enumerate(actions_q_values):
|
|
self.q_value_for_action[i].add_sample(q_value)
|
|
|
|
action_info = ActionInfo(action=action,
|
|
action_value=actions_q_values[action],
|
|
max_action_value=np.max(actions_q_values),
|
|
all_action_probabilities=action_probabilities)
|
|
|
|
else:
|
|
action_info = ActionInfo(action=action, all_action_probabilities=action_probabilities)
|
|
|
|
return action_info
|
|
|
|
def learn_from_batch(self, batch):
|
|
raise NotImplementedError("ValueOptimizationAgent is an abstract agent. Not to be used directly.")
|
|
|
|
def run_off_policy_evaluation(self):
|
|
"""
|
|
Run the off-policy evaluation estimators to get a prediction for the performance of the current policy based on
|
|
an evaluation dataset, which was collected by another policy(ies).
|
|
:return: None
|
|
"""
|
|
assert self.ope_manager
|
|
|
|
if not isinstance(self.pre_network_filter, NoInputFilter) and len(self.pre_network_filter.reward_filters) != 0:
|
|
raise ValueError("Defining a pre-network reward filter when OPEs are calculated will result in a mismatch "
|
|
"between q values (which are scaled), and actual rewards, which are not. It is advisable "
|
|
"to use an input_filter, if possible, instead, which will filter the transitions directly "
|
|
"in the replay buffer, affecting both the q_values and the rewards themselves. ")
|
|
|
|
ips, dm, dr, seq_dr, wis = self.ope_manager.evaluate(
|
|
evaluation_dataset_as_episodes=self.memory.evaluation_dataset_as_episodes,
|
|
evaluation_dataset_as_transitions=self.memory.evaluation_dataset_as_transitions,
|
|
batch_size=self.ap.network_wrappers['main'].batch_size,
|
|
discount_factor=self.ap.algorithm.discount,
|
|
q_network=self.networks['main'].online_network,
|
|
network_keys=list(self.ap.network_wrappers['main'].input_embedders_parameters.keys()))
|
|
|
|
# get the estimators out to the screen
|
|
log = OrderedDict()
|
|
log['Epoch'] = self.training_epoch
|
|
log['IPS'] = ips
|
|
log['DM'] = dm
|
|
log['DR'] = dr
|
|
log['WIS'] = wis
|
|
log['Sequential-DR'] = seq_dr
|
|
screen.log_dict(log, prefix='Off-Policy Evaluation')
|
|
|
|
# get the estimators out to dashboard
|
|
self.agent_logger.set_current_time(self.get_current_time() + 1)
|
|
self.agent_logger.create_signal_value('Inverse Propensity Score', ips)
|
|
self.agent_logger.create_signal_value('Direct Method Reward', dm)
|
|
self.agent_logger.create_signal_value('Doubly Robust', dr)
|
|
self.agent_logger.create_signal_value('Sequential Doubly Robust', seq_dr)
|
|
self.agent_logger.create_signal_value('Weighted Importance Sampling', wis)
|
|
|
|
def get_reward_model_loss(self, batch: Batch):
|
|
network_keys = self.ap.network_wrappers['reward_model'].input_embedders_parameters.keys()
|
|
current_rewards_prediction_for_all_actions = self.networks['reward_model'].online_network.predict(
|
|
batch.states(network_keys))
|
|
current_rewards_prediction_for_all_actions[range(batch.size), batch.actions()] = batch.rewards()
|
|
|
|
return self.networks['reward_model'].train_and_sync_networks(
|
|
batch.states(network_keys), current_rewards_prediction_for_all_actions)[0]
|
|
|
|
def improve_reward_model(self, epochs: int):
|
|
"""
|
|
Train a reward model to be used by the doubly-robust estimator
|
|
|
|
:param epochs: The total number of epochs to use for training a reward model
|
|
:return: None
|
|
"""
|
|
batch_size = self.ap.network_wrappers['reward_model'].batch_size
|
|
|
|
# this is fitted from the training dataset
|
|
for epoch in range(epochs):
|
|
loss = 0
|
|
total_transitions_processed = 0
|
|
for i, batch in enumerate(self.call_memory('get_shuffled_training_data_generator', batch_size)):
|
|
batch = Batch(batch)
|
|
loss += self.get_reward_model_loss(batch)
|
|
total_transitions_processed += batch.size
|
|
|
|
log = OrderedDict()
|
|
log['Epoch'] = epoch
|
|
log['loss'] = loss / total_transitions_processed
|
|
screen.log_dict(log, prefix='Training Reward Model')
|
|
|