1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 11:10:20 +01:00
Files
coach/rl_coach/agents/pal_agent.py
2018-08-13 17:11:34 +03:00

95 lines
4.2 KiB
Python

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Union
import numpy as np
from rl_coach.agents.dqn_agent import DQNAgentParameters, DQNAlgorithmParameters
from rl_coach.agents.value_optimization_agent import ValueOptimizationAgent
from rl_coach.memories.episodic.episodic_experience_replay import EpisodicExperienceReplay, \
EpisodicExperienceReplayParameters
class PALAlgorithmParameters(DQNAlgorithmParameters):
def __init__(self):
super().__init__()
self.pal_alpha = 0.9
self.persistent_advantage_learning = False
self.monte_carlo_mixing_rate = 0.1
class PALAgentParameters(DQNAgentParameters):
def __init__(self):
super().__init__()
self.algorithm = PALAlgorithmParameters()
self.memory = EpisodicExperienceReplayParameters()
@property
def path(self):
return 'rl_coach.agents.pal_agent:PALAgent'
# Persistent Advantage Learning - https://arxiv.org/pdf/1512.04860.pdf
class PALAgent(ValueOptimizationAgent):
def __init__(self, agent_parameters, parent: Union['LevelManager', 'CompositeAgent']=None):
super().__init__(agent_parameters, parent)
self.alpha = agent_parameters.algorithm.pal_alpha
self.persistent = agent_parameters.algorithm.persistent_advantage_learning
self.monte_carlo_mixing_rate = agent_parameters.algorithm.monte_carlo_mixing_rate
def learn_from_batch(self, batch):
network_keys = self.ap.network_wrappers['main'].input_embedders_parameters.keys()
# next state values
q_st_plus_1_target, q_st_plus_1_online = self.networks['main'].parallel_prediction([
(self.networks['main'].target_network, batch.next_states(network_keys)),
(self.networks['main'].online_network, batch.next_states(network_keys))
])
selected_actions = np.argmax(q_st_plus_1_online, 1)
v_st_plus_1_target = np.max(q_st_plus_1_target, 1)
# current state values
q_st_target, q_st_online = self.networks['main'].parallel_prediction([
(self.networks['main'].target_network, batch.states(network_keys)),
(self.networks['main'].online_network, batch.states(network_keys))
])
v_st_target = np.max(q_st_target, 1)
# calculate TD error
TD_targets = np.copy(q_st_online)
for i in range(self.ap.network_wrappers['main'].batch_size):
TD_targets[i, batch.actions()[i]] = batch.rewards()[i] + \
(1.0 - batch.game_overs()[i]) * self.ap.algorithm.discount * \
q_st_plus_1_target[i][selected_actions[i]]
advantage_learning_update = v_st_target[i] - q_st_target[i, batch.actions()[i]]
next_advantage_learning_update = v_st_plus_1_target[i] - q_st_plus_1_target[i, selected_actions[i]]
# Persistent Advantage Learning or Regular Advantage Learning
if self.persistent:
TD_targets[i, batch.actions()[i]] -= self.alpha * min(advantage_learning_update, next_advantage_learning_update)
else:
TD_targets[i, batch.actions()[i]] -= self.alpha * advantage_learning_update
# mixing monte carlo updates
monte_carlo_target = batch.total_returns()[i]
TD_targets[i, batch.actions()[i]] = (1 - self.monte_carlo_mixing_rate) * TD_targets[i, batch.actions()[i]] \
+ self.monte_carlo_mixing_rate * monte_carlo_target
result = self.networks['main'].train_and_sync_networks(batch.states(network_keys), TD_targets)
total_loss, losses, unclipped_grads = result[:3]
return total_loss, losses, unclipped_grads