mirror of
https://github.com/gryf/coach.git
synced 2025-12-18 19:50:17 +01:00
* Changes required for Continuous PPO Head with MXNet. Used in MountainCarContinuous_ClippedPPO. * Simplified changes for continuous ppo. * Cleaned up to avoid duplicate code, and simplified covariance creation.
673 lines
33 KiB
Python
673 lines
33 KiB
Python
from typing import List, Tuple, Union
|
|
from types import ModuleType
|
|
|
|
import math
|
|
import mxnet as mx
|
|
from mxnet.gluon import nn
|
|
from rl_coach.base_parameters import AgentParameters
|
|
from rl_coach.core_types import ActionProbabilities
|
|
from rl_coach.spaces import SpacesDefinition, BoxActionSpace, DiscreteActionSpace
|
|
from rl_coach.utils import eps
|
|
from rl_coach.architectures.mxnet_components.heads.head import Head, HeadLoss, LossInputSchema
|
|
from rl_coach.architectures.mxnet_components.heads.head import LOSS_OUT_TYPE_LOSS, LOSS_OUT_TYPE_REGULARIZATION
|
|
from rl_coach.architectures.mxnet_components.utils import hybrid_clip
|
|
|
|
|
|
LOSS_OUT_TYPE_KL = 'kl_divergence'
|
|
LOSS_OUT_TYPE_ENTROPY = 'entropy'
|
|
LOSS_OUT_TYPE_LIKELIHOOD_RATIO = 'likelihood_ratio'
|
|
LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO = 'clipped_likelihood_ratio'
|
|
|
|
nd_sym_type = Union[mx.nd.NDArray, mx.sym.Symbol]
|
|
|
|
|
|
class MultivariateNormalDist:
|
|
def __init__(self,
|
|
num_var: int,
|
|
mean: nd_sym_type,
|
|
sigma: nd_sym_type,
|
|
F: ModuleType=mx.nd) -> None:
|
|
"""
|
|
Distribution object for Multivariate Normal. Works with batches.
|
|
Optionally works with batches and time steps, but be consistent in usage: i.e. if using time_step,
|
|
mean, sigma and data for log_prob must all include a time_step dimension.
|
|
|
|
:param num_var: number of variables in distribution
|
|
:param mean: mean for each variable,
|
|
of shape (num_var) or
|
|
of shape (batch_size, num_var) or
|
|
of shape (batch_size, time_step, num_var).
|
|
:param sigma: covariance matrix,
|
|
of shape (num_var, num_var) or
|
|
of shape (batch_size, num_var, num_var) or
|
|
of shape (batch_size, time_step, num_var, num_var).
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
"""
|
|
self.num_var = num_var
|
|
self.mean = mean
|
|
self.sigma = sigma
|
|
self.F = F
|
|
|
|
def inverse_using_cholesky(self, matrix: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
Calculate inverses for a batch of matrices using Cholesky decomposition method.
|
|
|
|
:param matrix: matrix (or matrices) to invert,
|
|
of shape (num_var, num_var) or
|
|
of shape (batch_size, num_var, num_var) or
|
|
of shape (batch_size, time_step, num_var, num_var).
|
|
:return: inverted matrix (or matrices),
|
|
of shape (num_var, num_var) or
|
|
of shape (batch_size, num_var, num_var) or
|
|
of shape (batch_size, time_step, num_var, num_var).
|
|
"""
|
|
cholesky_factor = self.F.linalg.potrf(matrix)
|
|
return self.F.linalg.potri(cholesky_factor)
|
|
|
|
def log_det(self, matrix: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
Calculate log of the determinant for a batch of matrices using Cholesky decomposition method.
|
|
|
|
:param matrix: matrix (or matrices) to invert,
|
|
of shape (num_var, num_var) or
|
|
of shape (batch_size, num_var, num_var) or
|
|
of shape (batch_size, time_step, num_var, num_var).
|
|
:return: inverted matrix (or matrices),
|
|
of shape (num_var, num_var) or
|
|
of shape (batch_size, num_var, num_var) or
|
|
of shape (batch_size, time_step, num_var, num_var).
|
|
"""
|
|
cholesky_factor = self.F.linalg.potrf(matrix)
|
|
return 2 * self.F.linalg.sumlogdiag(cholesky_factor)
|
|
|
|
def log_prob(self, x: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
Calculate the log probability of data given the current distribution.
|
|
|
|
See http://www.notenoughthoughts.net/posts/normal-log-likelihood-gradient.html
|
|
and https://discuss.mxnet.io/t/multivariate-gaussian-log-density-operator/1169/7
|
|
|
|
:param x: input data,
|
|
of shape (num_var) or
|
|
of shape (batch_size, num_var) or
|
|
of shape (batch_size, time_step, num_var).
|
|
:return: log_probability,
|
|
of shape (1) or
|
|
of shape (batch_size) or
|
|
of shape (batch_size, time_step).
|
|
"""
|
|
a = (self.num_var / 2) * math.log(2 * math.pi)
|
|
log_det_sigma = self.log_det(self.sigma)
|
|
b = (1 / 2) * log_det_sigma
|
|
sigma_inv = self.inverse_using_cholesky(self.sigma)
|
|
# deviation from mean, and dev_t is equivalent to transpose on last two dims.
|
|
dev = (x - self.mean).expand_dims(-1)
|
|
dev_t = (x - self.mean).expand_dims(-2)
|
|
|
|
# since batch_dot only works with ndarrays with ndim of 3,
|
|
# and we could have ndarrays with ndim of 4,
|
|
# we flatten batch_size and time_step into single dim.
|
|
dev_flat = dev.reshape(shape=(-1, 0, 0), reverse=1)
|
|
sigma_inv_flat = sigma_inv.reshape(shape=(-1, 0, 0), reverse=1)
|
|
dev_t_flat = dev_t.reshape(shape=(-1, 0, 0), reverse=1)
|
|
c = (1 / 2) * self.F.batch_dot(self.F.batch_dot(dev_t_flat, sigma_inv_flat), dev_flat)
|
|
# and now reshape back to (batch_size, time_step) if required.
|
|
c = c.reshape_like(b)
|
|
|
|
log_likelihood = -a - b - c
|
|
return log_likelihood
|
|
|
|
def entropy(self) -> nd_sym_type:
|
|
"""
|
|
Calculate entropy of current distribution.
|
|
|
|
See http://www.nowozin.net/sebastian/blog/the-entropy-of-a-normal-distribution.html
|
|
:return: entropy,
|
|
of shape (1) or
|
|
of shape (batch_size) or
|
|
of shape (batch_size, time_step).
|
|
"""
|
|
# todo: check if differential entropy is correct
|
|
log_det_sigma = self.log_det(self.sigma)
|
|
return (self.num_var / 2) + ((self.num_var / 2) * math.log(2 * math.pi)) + ((1 / 2) * log_det_sigma)
|
|
|
|
def kl_div(self, alt_dist) -> nd_sym_type:
|
|
"""
|
|
Calculated KL-Divergence with another MultivariateNormalDist distribution
|
|
See https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
|
|
Specifically https://wikimedia.org/api/rest_v1/media/math/render/svg/a3bf3b4917bd1fcb8be48d6d6139e2e387bdc7d3
|
|
|
|
:param alt_dist: alternative distribution used for kl divergence calculation
|
|
:type alt_dist: MultivariateNormalDist
|
|
:return: KL-Divergence, of shape (1,)
|
|
"""
|
|
sigma_a_inv = self.F.linalg.potri(self.F.linalg.potrf(self.sigma))
|
|
sigma_b_inv = self.F.linalg.potri(self.F.linalg.potrf(alt_dist.sigma))
|
|
term1a = mx.nd.batch_dot(sigma_b_inv, self.sigma)
|
|
# sum of diagonal for batch of matrices
|
|
term1 = (self.F.eye(self.num_var).broadcast_like(term1a) * term1a).sum(axis=-1).sum(axis=-1)
|
|
mean_diff = (alt_dist.mean - self.mean).expand_dims(-1)
|
|
mean_diff_t = (alt_dist.mean - self.mean).expand_dims(-2)
|
|
term2 = self.F.batch_dot(self.F.batch_dot(mean_diff_t, sigma_b_inv), mean_diff).reshape_like(term1)
|
|
term3 = (2 * self.F.linalg.sumlogdiag(self.F.linalg.potrf(alt_dist.sigma))) -\
|
|
(2 * self.F.linalg.sumlogdiag(self.F.linalg.potrf(self.sigma)))
|
|
return 0.5 * (term1 + term2 - self.num_var + term3)
|
|
|
|
|
|
|
|
class CategoricalDist:
|
|
def __init__(self, n_classes: int, probs: nd_sym_type, F: ModuleType=mx.nd) -> None:
|
|
"""
|
|
Distribution object for Categorical data.
|
|
Optionally works with batches and time steps, but be consistent in usage: i.e. if using time_step,
|
|
mean, sigma and data for log_prob must all include a time_step dimension.
|
|
|
|
:param n_classes: number of classes in distribution
|
|
:param probs: probabilities for each class,
|
|
of shape (n_classes),
|
|
of shape (batch_size, n_classes) or
|
|
of shape (batch_size, time_step, n_classes)
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
"""
|
|
self.n_classes = n_classes
|
|
self.probs = probs
|
|
self.F = F
|
|
|
|
|
|
def log_prob(self, actions: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
Calculate the log probability of data given the current distribution.
|
|
|
|
:param actions: actions, with int8 data type,
|
|
of shape (1) if probs was (n_classes),
|
|
of shape (batch_size) if probs was (batch_size, n_classes) and
|
|
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
|
|
:return: log_probability,
|
|
of shape (1) if probs was (n_classes),
|
|
of shape (batch_size) if probs was (batch_size, n_classes) and
|
|
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
|
|
"""
|
|
action_mask = actions.one_hot(depth=self.n_classes)
|
|
action_probs = (self.probs * action_mask).sum(axis=-1)
|
|
return action_probs.log()
|
|
|
|
def entropy(self) -> nd_sym_type:
|
|
"""
|
|
Calculate entropy of current distribution.
|
|
|
|
:return: entropy,
|
|
of shape (1) if probs was (n_classes),
|
|
of shape (batch_size) if probs was (batch_size, n_classes) and
|
|
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
|
|
"""
|
|
# todo: look into numerical stability
|
|
return -(self.probs.log()*self.probs).sum(axis=-1)
|
|
|
|
def kl_div(self, alt_dist) -> nd_sym_type:
|
|
"""
|
|
Calculated KL-Divergence with another Categorical distribution
|
|
|
|
:param alt_dist: alternative distribution used for kl divergence calculation
|
|
:type alt_dist: CategoricalDist
|
|
:return: KL-Divergence
|
|
"""
|
|
logits_a = self.probs.clip(a_min=eps, a_max=1 - eps).log()
|
|
logits_b = alt_dist.probs.clip(a_min=eps, a_max=1 - eps).log()
|
|
t = self.probs * (logits_a - logits_b)
|
|
t = self.F.where(condition=(alt_dist.probs == 0), x=self.F.ones_like(alt_dist.probs) * math.inf, y=t)
|
|
t = self.F.where(condition=(self.probs == 0), x=self.F.zeros_like(self.probs), y=t)
|
|
return t.sum(axis=-1)
|
|
|
|
|
|
class DiscretePPOHead(nn.HybridBlock):
|
|
def __init__(self, num_actions: int) -> None:
|
|
"""
|
|
Head block for Discrete Proximal Policy Optimization, to calculate probabilities for each action given
|
|
middleware representation of the environment state.
|
|
|
|
:param num_actions: number of actions in action space.
|
|
"""
|
|
super(DiscretePPOHead, self).__init__()
|
|
with self.name_scope():
|
|
self.dense = nn.Dense(units=num_actions, flatten=False)
|
|
|
|
def hybrid_forward(self, F: ModuleType, x: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
Used for forward pass through head network.
|
|
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
:param x: middleware state representation,
|
|
of shape (batch_size, in_channels) or
|
|
of shape (batch_size, time_step, in_channels).
|
|
:return: batch of probabilities for each action,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
"""
|
|
policy_values = self.dense(x)
|
|
policy_probs = F.softmax(policy_values)
|
|
return policy_probs
|
|
|
|
|
|
class ContinuousPPOHead(nn.HybridBlock):
|
|
def __init__(self, num_actions: int) -> None:
|
|
"""
|
|
Head block for Continuous Proximal Policy Optimization, to calculate probabilities for each action given
|
|
middleware representation of the environment state.
|
|
|
|
:param num_actions: number of actions in action space.
|
|
"""
|
|
super(ContinuousPPOHead, self).__init__()
|
|
with self.name_scope():
|
|
# todo: change initialization strategy
|
|
self.dense = nn.Dense(units=num_actions, flatten=False)
|
|
# all samples (across batch, and time step) share the same covariance, which is learnt,
|
|
# but since we assume the action probability variables are independent,
|
|
# only the diagonal entries of the covariance matrix are specified.
|
|
self.log_std = self.params.get('log_std',
|
|
shape=(num_actions,),
|
|
init=mx.init.Zero(),
|
|
allow_deferred_init=True)
|
|
# todo: is_local?
|
|
|
|
def hybrid_forward(self, F: ModuleType, x: nd_sym_type, log_std: nd_sym_type) -> Tuple[nd_sym_type, nd_sym_type]:
|
|
"""
|
|
Used for forward pass through head network.
|
|
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
:param x: middleware state representation,
|
|
of shape (batch_size, in_channels) or
|
|
of shape (batch_size, time_step, in_channels).
|
|
:return: batch of probabilities for each action,
|
|
of shape (batch_size, action_mean) or
|
|
of shape (batch_size, time_step, action_mean).
|
|
"""
|
|
policy_means = self.dense(x)
|
|
policy_std = log_std.exp().expand_dims(0).broadcast_like(policy_means)
|
|
return policy_means, policy_std
|
|
|
|
|
|
class ClippedPPOLossDiscrete(HeadLoss):
|
|
def __init__(self,
|
|
num_actions: int,
|
|
clip_likelihood_ratio_using_epsilon: float,
|
|
beta: float=0,
|
|
use_kl_regularization: bool=False,
|
|
initial_kl_coefficient: float=1,
|
|
kl_cutoff: float=0,
|
|
high_kl_penalty_coefficient: float=1,
|
|
weight: float=1,
|
|
batch_axis: int=0) -> None:
|
|
"""
|
|
Loss for discrete version of Clipped PPO.
|
|
|
|
:param num_actions: number of actions in action space.
|
|
:param clip_likelihood_ratio_using_epsilon: epsilon to use for likelihood ratio clipping.
|
|
:param beta: loss coefficient applied to entropy
|
|
:param use_kl_regularization: option to add kl divergence loss
|
|
:param initial_kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
|
|
:param kl_cutoff: threshold for using high_kl_penalty_coefficient
|
|
:param high_kl_penalty_coefficient: loss coefficient applied to kv divergence above kl_cutoff
|
|
:param weight: scalar used to adjust relative weight of loss (if using this loss with others).
|
|
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
|
|
"""
|
|
super(ClippedPPOLossDiscrete, self).__init__(weight=weight, batch_axis=batch_axis)
|
|
self.weight = weight
|
|
self.num_actions = num_actions
|
|
self.clip_likelihood_ratio_using_epsilon = clip_likelihood_ratio_using_epsilon
|
|
self.beta = beta
|
|
self.use_kl_regularization = use_kl_regularization
|
|
self.initial_kl_coefficient = initial_kl_coefficient if self.use_kl_regularization else 0.0
|
|
self.kl_coefficient = self.params.get('kl_coefficient',
|
|
shape=(1,),
|
|
init=mx.init.Constant([initial_kl_coefficient,]),
|
|
differentiable=False)
|
|
self.kl_cutoff = kl_cutoff
|
|
self.high_kl_penalty_coefficient = high_kl_penalty_coefficient
|
|
|
|
@property
|
|
def input_schema(self) -> LossInputSchema:
|
|
return LossInputSchema(
|
|
head_outputs=['new_policy_probs'],
|
|
agent_inputs=['actions', 'old_policy_probs', 'clip_param_rescaler'],
|
|
targets=['advantages']
|
|
)
|
|
|
|
def loss_forward(self,
|
|
F: ModuleType,
|
|
new_policy_probs: nd_sym_type,
|
|
actions: nd_sym_type,
|
|
old_policy_probs: nd_sym_type,
|
|
clip_param_rescaler: nd_sym_type,
|
|
advantages: nd_sym_type,
|
|
kl_coefficient: nd_sym_type) -> List[Tuple[nd_sym_type, str]]:
|
|
"""
|
|
Used for forward pass through loss computations.
|
|
Works with batches of data, and optionally time_steps, but be consistent in usage: i.e. if using time_step,
|
|
new_policy_probs, old_policy_probs, actions and advantages all must include a time_step dimension.
|
|
|
|
NOTE: order of input arguments MUST NOT CHANGE because it matches the order
|
|
parameters are passed in ppo_agent:train_network()
|
|
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
:param new_policy_probs: action probabilities predicted by DiscretePPOHead network,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param old_policy_probs: action probabilities for previous policy,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param actions: true actions taken during rollout,
|
|
of shape (batch_size) or
|
|
of shape (batch_size, time_step).
|
|
:param clip_param_rescaler: scales epsilon to use for likelihood ratio clipping.
|
|
:param advantages: change in state value after taking action (a.k.a advantage)
|
|
of shape (batch_size) or
|
|
of shape (batch_size, time_step).
|
|
:param kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
|
|
:return: loss, of shape (batch_size).
|
|
"""
|
|
|
|
old_policy_dist = CategoricalDist(self.num_actions, old_policy_probs, F=F)
|
|
action_probs_wrt_old_policy = old_policy_dist.log_prob(actions)
|
|
|
|
new_policy_dist = CategoricalDist(self.num_actions, new_policy_probs, F=F)
|
|
action_probs_wrt_new_policy = new_policy_dist.log_prob(actions)
|
|
|
|
entropy_loss = - self.beta * new_policy_dist.entropy().mean()
|
|
|
|
if self.use_kl_regularization:
|
|
kl_div = old_policy_dist.kl_div(new_policy_dist).mean()
|
|
weighted_kl_div = kl_coefficient * kl_div
|
|
high_kl_div = F.stack(F.zeros_like(kl_div), kl_div - self.kl_cutoff).max().square()
|
|
weighted_high_kl_div = self.high_kl_penalty_coefficient * high_kl_div
|
|
kl_div_loss = weighted_kl_div + weighted_high_kl_div
|
|
else:
|
|
kl_div_loss = F.zeros(shape=(1,))
|
|
|
|
# working with log probs, so minus first, then exponential (same as division)
|
|
likelihood_ratio = (action_probs_wrt_new_policy - action_probs_wrt_old_policy).exp()
|
|
|
|
if self.clip_likelihood_ratio_using_epsilon is not None:
|
|
# clipping of likelihood ratio
|
|
min_value = 1 - self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
|
|
max_value = 1 + self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
|
|
|
|
# can't use F.clip (with variable clipping bounds), hence custom implementation
|
|
clipped_likelihood_ratio = hybrid_clip(F, likelihood_ratio, clip_lower=min_value, clip_upper=max_value)
|
|
|
|
# lower bound of original, and clipped versions or each scaled advantage
|
|
# element-wise min between the two ndarrays
|
|
unclipped_scaled_advantages = likelihood_ratio * advantages
|
|
clipped_scaled_advantages = clipped_likelihood_ratio * advantages
|
|
scaled_advantages = F.stack(unclipped_scaled_advantages, clipped_scaled_advantages).min(axis=0)
|
|
else:
|
|
scaled_advantages = likelihood_ratio * advantages
|
|
clipped_likelihood_ratio = F.zeros_like(likelihood_ratio)
|
|
|
|
# for each batch, calculate expectation of scaled_advantages across time steps,
|
|
# but want code to work with data without time step too, so reshape to add timestep if doesn't exist.
|
|
scaled_advantages_w_time = scaled_advantages.reshape(shape=(0, -1))
|
|
expected_scaled_advantages = scaled_advantages_w_time.mean(axis=1)
|
|
# want to maximize expected_scaled_advantages, add minus so can minimize.
|
|
surrogate_loss = (-expected_scaled_advantages * self.weight).mean()
|
|
|
|
return [
|
|
(surrogate_loss, LOSS_OUT_TYPE_LOSS),
|
|
(entropy_loss + kl_div_loss, LOSS_OUT_TYPE_REGULARIZATION),
|
|
(kl_div_loss, LOSS_OUT_TYPE_KL),
|
|
(entropy_loss, LOSS_OUT_TYPE_ENTROPY),
|
|
(likelihood_ratio, LOSS_OUT_TYPE_LIKELIHOOD_RATIO),
|
|
(clipped_likelihood_ratio, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO)
|
|
]
|
|
|
|
|
|
class ClippedPPOLossContinuous(HeadLoss):
|
|
def __init__(self,
|
|
num_actions: int,
|
|
clip_likelihood_ratio_using_epsilon: float,
|
|
beta: float=0,
|
|
use_kl_regularization: bool=False,
|
|
initial_kl_coefficient: float=1,
|
|
kl_cutoff: float=0,
|
|
high_kl_penalty_coefficient: float=1,
|
|
weight: float=1,
|
|
batch_axis: int=0):
|
|
"""
|
|
Loss for continuous version of Clipped PPO.
|
|
|
|
:param num_actions: number of actions in action space.
|
|
:param clip_likelihood_ratio_using_epsilon: epsilon to use for likelihood ratio clipping.
|
|
:param beta: loss coefficient applied to entropy
|
|
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
|
|
:param use_kl_regularization: option to add kl divergence loss
|
|
:param initial_kl_coefficient: initial loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
|
|
:param kl_cutoff: threshold for using high_kl_penalty_coefficient
|
|
:param high_kl_penalty_coefficient: loss coefficient applied to kv divergence above kl_cutoff
|
|
:param weight: scalar used to adjust relative weight of loss (if using this loss with others).
|
|
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
|
|
"""
|
|
super(ClippedPPOLossContinuous, self).__init__(weight=weight, batch_axis=batch_axis)
|
|
self.weight = weight
|
|
self.num_actions = num_actions
|
|
self.clip_likelihood_ratio_using_epsilon = clip_likelihood_ratio_using_epsilon
|
|
self.beta = beta
|
|
self.use_kl_regularization = use_kl_regularization
|
|
self.initial_kl_coefficient = initial_kl_coefficient if self.use_kl_regularization else 0.0
|
|
self.kl_coefficient = self.params.get('kl_coefficient',
|
|
shape=(1,),
|
|
init=mx.init.Constant([initial_kl_coefficient,]),
|
|
differentiable=False)
|
|
self.kl_cutoff = kl_cutoff
|
|
self.high_kl_penalty_coefficient = high_kl_penalty_coefficient
|
|
|
|
@property
|
|
def input_schema(self) -> LossInputSchema:
|
|
return LossInputSchema(
|
|
head_outputs=['new_policy_means','new_policy_stds'],
|
|
agent_inputs=['actions', 'old_policy_means', 'old_policy_stds', 'clip_param_rescaler'],
|
|
targets=['advantages']
|
|
)
|
|
|
|
def loss_forward(self,
|
|
F: ModuleType,
|
|
new_policy_means: nd_sym_type,
|
|
new_policy_stds: nd_sym_type,
|
|
actions: nd_sym_type,
|
|
old_policy_means: nd_sym_type,
|
|
old_policy_stds: nd_sym_type,
|
|
clip_param_rescaler: nd_sym_type,
|
|
advantages: nd_sym_type,
|
|
kl_coefficient: nd_sym_type) -> List[Tuple[nd_sym_type, str]]:
|
|
"""
|
|
Used for forward pass through loss computations.
|
|
Works with batches of data, and optionally time_steps, but be consistent in usage: i.e. if using time_step,
|
|
new_policy_means, old_policy_means, actions and advantages all must include a time_step dimension.
|
|
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
:param new_policy_means: action means predicted by MultivariateNormalDist network,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param new_policy_stds: action standard deviation returned by head,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param actions: true actions taken during rollout,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param old_policy_means: action means for previous policy,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param old_policy_stds: action standard deviation returned by head previously,
|
|
of shape (batch_size, num_actions) or
|
|
of shape (batch_size, time_step, num_actions).
|
|
:param clip_param_rescaler: scales epsilon to use for likelihood ratio clipping.
|
|
:param advantages: change in state value after taking action (a.k.a advantage)
|
|
of shape (batch_size,) or
|
|
of shape (batch_size, time_step).
|
|
:param kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
|
|
:return: loss, of shape (batch_size).
|
|
"""
|
|
|
|
def diagonal_covariance(stds, size):
|
|
vars = stds ** 2
|
|
# sets diagonal in (batch size and time step) covariance matrices
|
|
vars_tiled = vars.expand_dims(2).tile((1, 1, size))
|
|
covars = F.broadcast_mul(vars_tiled, F.eye(size))
|
|
return covars
|
|
|
|
old_covar = diagonal_covariance(stds=old_policy_stds, size=self.num_actions)
|
|
old_policy_dist = MultivariateNormalDist(self.num_actions, old_policy_means, old_covar, F=F)
|
|
action_probs_wrt_old_policy = old_policy_dist.log_prob(actions)
|
|
|
|
new_covar = diagonal_covariance(stds=new_policy_stds, size=self.num_actions)
|
|
new_policy_dist = MultivariateNormalDist(self.num_actions, new_policy_means, new_covar, F=F)
|
|
action_probs_wrt_new_policy = new_policy_dist.log_prob(actions)
|
|
|
|
entropy_loss = - self.beta * new_policy_dist.entropy().mean()
|
|
|
|
if self.use_kl_regularization:
|
|
kl_div = old_policy_dist.kl_div(new_policy_dist).mean()
|
|
weighted_kl_div = kl_coefficient * kl_div
|
|
high_kl_div = F.stack(F.zeros_like(kl_div), kl_div - self.kl_cutoff).max().square()
|
|
weighted_high_kl_div = self.high_kl_penalty_coefficient * high_kl_div
|
|
kl_div_loss = weighted_kl_div + weighted_high_kl_div
|
|
else:
|
|
kl_div_loss = F.zeros(shape=(1,))
|
|
|
|
# working with log probs, so minus first, then exponential (same as division)
|
|
likelihood_ratio = (action_probs_wrt_new_policy - action_probs_wrt_old_policy).exp()
|
|
|
|
if self.clip_likelihood_ratio_using_epsilon is not None:
|
|
# clipping of likelihood ratio
|
|
min_value = 1 - self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
|
|
max_value = 1 + self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
|
|
|
|
# can't use F.clip (with variable clipping bounds), hence custom implementation
|
|
clipped_likelihood_ratio = hybrid_clip(F, likelihood_ratio, clip_lower=min_value, clip_upper=max_value)
|
|
|
|
# lower bound of original, and clipped versions or each scaled advantage
|
|
# element-wise min between the two ndarrays
|
|
unclipped_scaled_advantages = likelihood_ratio * advantages
|
|
clipped_scaled_advantages = clipped_likelihood_ratio * advantages
|
|
scaled_advantages = F.stack(unclipped_scaled_advantages, clipped_scaled_advantages).min(axis=0)
|
|
else:
|
|
scaled_advantages = likelihood_ratio * advantages
|
|
clipped_likelihood_ratio = F.zeros_like(likelihood_ratio)
|
|
|
|
# for each batch, calculate expectation of scaled_advantages across time steps,
|
|
# but want code to work with data without time step too, so reshape to add timestep if doesn't exist.
|
|
scaled_advantages_w_time = scaled_advantages.reshape(shape=(0, -1))
|
|
expected_scaled_advantages = scaled_advantages_w_time.mean(axis=1)
|
|
# want to maximize expected_scaled_advantages, add minus so can minimize.
|
|
surrogate_loss = (-expected_scaled_advantages * self.weight).mean()
|
|
|
|
return [
|
|
(surrogate_loss, LOSS_OUT_TYPE_LOSS),
|
|
(entropy_loss + kl_div_loss, LOSS_OUT_TYPE_REGULARIZATION),
|
|
(kl_div_loss, LOSS_OUT_TYPE_KL),
|
|
(entropy_loss, LOSS_OUT_TYPE_ENTROPY),
|
|
(likelihood_ratio, LOSS_OUT_TYPE_LIKELIHOOD_RATIO),
|
|
(clipped_likelihood_ratio, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO)
|
|
]
|
|
|
|
|
|
class PPOHead(Head):
|
|
def __init__(self,
|
|
agent_parameters: AgentParameters,
|
|
spaces: SpacesDefinition,
|
|
network_name: str,
|
|
head_type_idx: int=0,
|
|
loss_weight: float=1.,
|
|
is_local: bool=True,
|
|
activation_function: str='tanh',
|
|
dense_layer: None=None) -> None:
|
|
"""
|
|
Head block for Proximal Policy Optimization, to calculate probabilities for each action given middleware
|
|
representation of the environment state.
|
|
|
|
:param agent_parameters: containing algorithm parameters such as clip_likelihood_ratio_using_epsilon
|
|
and beta_entropy.
|
|
:param spaces: containing action spaces used for defining size of network output.
|
|
:param network_name: name of head network. currently unused.
|
|
:param head_type_idx: index of head network. currently unused.
|
|
:param loss_weight: scalar used to adjust relative weight of loss (if using this loss with others).
|
|
:param is_local: flag to denote if network is local. currently unused.
|
|
:param activation_function: activation function to use between layers. currently unused.
|
|
:param dense_layer: type of dense layer to use in network. currently unused.
|
|
"""
|
|
super().__init__(agent_parameters, spaces, network_name, head_type_idx, loss_weight, is_local, activation_function,
|
|
dense_layer=dense_layer)
|
|
self.return_type = ActionProbabilities
|
|
|
|
self.clip_likelihood_ratio_using_epsilon = agent_parameters.algorithm.clip_likelihood_ratio_using_epsilon
|
|
self.beta = agent_parameters.algorithm.beta_entropy
|
|
self.use_kl_regularization = agent_parameters.algorithm.use_kl_regularization
|
|
if self.use_kl_regularization:
|
|
self.initial_kl_coefficient = agent_parameters.algorithm.initial_kl_coefficient
|
|
self.kl_cutoff = 2 * agent_parameters.algorithm.target_kl_divergence
|
|
self.high_kl_penalty_coefficient = agent_parameters.algorithm.high_kl_penalty_coefficient
|
|
else:
|
|
self.initial_kl_coefficient, self.kl_cutoff, self.high_kl_penalty_coefficient = (None, None, None)
|
|
self._loss = []
|
|
|
|
if isinstance(self.spaces.action, DiscreteActionSpace):
|
|
self.net = DiscretePPOHead(num_actions=len(self.spaces.action.actions))
|
|
elif isinstance(self.spaces.action, BoxActionSpace):
|
|
self.net = ContinuousPPOHead(num_actions=self.spaces.action.shape[0])
|
|
else:
|
|
raise ValueError("Only discrete or continuous action spaces are supported for PPO.")
|
|
|
|
def hybrid_forward(self,
|
|
F: ModuleType,
|
|
x: nd_sym_type) -> nd_sym_type:
|
|
"""
|
|
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
|
|
:param x: middleware embedding
|
|
:return: policy parameters/probabilities
|
|
"""
|
|
return self.net(x)
|
|
|
|
def loss(self) -> mx.gluon.loss.Loss:
|
|
"""
|
|
Specifies loss block to be used for this policy head.
|
|
|
|
:return: loss block (can be called as function) for action probabilities returned by this policy network.
|
|
"""
|
|
if isinstance(self.spaces.action, DiscreteActionSpace):
|
|
loss = ClippedPPOLossDiscrete(len(self.spaces.action.actions),
|
|
self.clip_likelihood_ratio_using_epsilon,
|
|
self.beta,
|
|
self.use_kl_regularization, self.initial_kl_coefficient,
|
|
self.kl_cutoff, self.high_kl_penalty_coefficient,
|
|
self.loss_weight)
|
|
elif isinstance(self.spaces.action, BoxActionSpace):
|
|
loss = ClippedPPOLossContinuous(self.spaces.action.shape[0],
|
|
self.clip_likelihood_ratio_using_epsilon,
|
|
self.beta,
|
|
self.use_kl_regularization, self.initial_kl_coefficient,
|
|
self.kl_cutoff, self.high_kl_penalty_coefficient,
|
|
self.loss_weight)
|
|
else:
|
|
raise ValueError("Only discrete or continuous action spaces are supported for PPO.")
|
|
loss.initialize()
|
|
# set a property so can assign_kl_coefficient in future,
|
|
# make a list, otherwise it would be added as a child of Head Block (due to type check)
|
|
self._loss = [loss]
|
|
return loss
|
|
|
|
@property
|
|
def kl_divergence(self):
|
|
return self.head_type_idx, LOSS_OUT_TYPE_KL
|
|
|
|
@property
|
|
def entropy(self):
|
|
return self.head_type_idx, LOSS_OUT_TYPE_ENTROPY
|
|
|
|
@property
|
|
def likelihood_ratio(self):
|
|
return self.head_type_idx, LOSS_OUT_TYPE_LIKELIHOOD_RATIO
|
|
|
|
@property
|
|
def clipped_likelihood_ratio(self):
|
|
return self.head_type_idx, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO
|
|
|
|
def assign_kl_coefficient(self, kl_coefficient: float) -> None:
|
|
self._loss[0].kl_coefficient.set_data(mx.nd.array((kl_coefficient,))) |