1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-18 11:40:18 +01:00
Files
coach/rl_coach/architectures/mxnet_components/heads/ppo_head.py
Sina Afrooze 19a68812f6 Added ONNX compatible broadcast_like function (#152)
- Also simplified the hybrid_clip implementation.
2018-11-25 11:23:18 +02:00

674 lines
33 KiB
Python

from typing import List, Tuple, Union
from types import ModuleType
import math
import mxnet as mx
from mxnet.gluon import nn
from rl_coach.base_parameters import AgentParameters
from rl_coach.core_types import ActionProbabilities
from rl_coach.spaces import SpacesDefinition, BoxActionSpace, DiscreteActionSpace
from rl_coach.utils import eps
from rl_coach.architectures.mxnet_components.heads.head import Head, HeadLoss, LossInputSchema,\
NormalizedRSSInitializer
from rl_coach.architectures.mxnet_components.heads.head import LOSS_OUT_TYPE_LOSS, LOSS_OUT_TYPE_REGULARIZATION
from rl_coach.architectures.mxnet_components.utils import hybrid_clip, broadcast_like
LOSS_OUT_TYPE_KL = 'kl_divergence'
LOSS_OUT_TYPE_ENTROPY = 'entropy'
LOSS_OUT_TYPE_LIKELIHOOD_RATIO = 'likelihood_ratio'
LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO = 'clipped_likelihood_ratio'
nd_sym_type = Union[mx.nd.NDArray, mx.sym.Symbol]
class MultivariateNormalDist:
def __init__(self,
num_var: int,
mean: nd_sym_type,
sigma: nd_sym_type,
F: ModuleType=mx.nd) -> None:
"""
Distribution object for Multivariate Normal. Works with batches.
Optionally works with batches and time steps, but be consistent in usage: i.e. if using time_step,
mean, sigma and data for log_prob must all include a time_step dimension.
:param num_var: number of variables in distribution
:param mean: mean for each variable,
of shape (num_var) or
of shape (batch_size, num_var) or
of shape (batch_size, time_step, num_var).
:param sigma: covariance matrix,
of shape (num_var, num_var) or
of shape (batch_size, num_var, num_var) or
of shape (batch_size, time_step, num_var, num_var).
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
"""
self.num_var = num_var
self.mean = mean
self.sigma = sigma
self.F = F
def inverse_using_cholesky(self, matrix: nd_sym_type) -> nd_sym_type:
"""
Calculate inverses for a batch of matrices using Cholesky decomposition method.
:param matrix: matrix (or matrices) to invert,
of shape (num_var, num_var) or
of shape (batch_size, num_var, num_var) or
of shape (batch_size, time_step, num_var, num_var).
:return: inverted matrix (or matrices),
of shape (num_var, num_var) or
of shape (batch_size, num_var, num_var) or
of shape (batch_size, time_step, num_var, num_var).
"""
cholesky_factor = self.F.linalg.potrf(matrix)
return self.F.linalg.potri(cholesky_factor)
def log_det(self, matrix: nd_sym_type) -> nd_sym_type:
"""
Calculate log of the determinant for a batch of matrices using Cholesky decomposition method.
:param matrix: matrix (or matrices) to invert,
of shape (num_var, num_var) or
of shape (batch_size, num_var, num_var) or
of shape (batch_size, time_step, num_var, num_var).
:return: inverted matrix (or matrices),
of shape (num_var, num_var) or
of shape (batch_size, num_var, num_var) or
of shape (batch_size, time_step, num_var, num_var).
"""
cholesky_factor = self.F.linalg.potrf(matrix)
return 2 * self.F.linalg.sumlogdiag(cholesky_factor)
def log_prob(self, x: nd_sym_type) -> nd_sym_type:
"""
Calculate the log probability of data given the current distribution.
See http://www.notenoughthoughts.net/posts/normal-log-likelihood-gradient.html
and https://discuss.mxnet.io/t/multivariate-gaussian-log-density-operator/1169/7
:param x: input data,
of shape (num_var) or
of shape (batch_size, num_var) or
of shape (batch_size, time_step, num_var).
:return: log_probability,
of shape (1) or
of shape (batch_size) or
of shape (batch_size, time_step).
"""
a = (self.num_var / 2) * math.log(2 * math.pi)
log_det_sigma = self.log_det(self.sigma)
b = (1 / 2) * log_det_sigma
sigma_inv = self.inverse_using_cholesky(self.sigma)
# deviation from mean, and dev_t is equivalent to transpose on last two dims.
dev = (x - self.mean).expand_dims(-1)
dev_t = (x - self.mean).expand_dims(-2)
# since batch_dot only works with ndarrays with ndim of 3,
# and we could have ndarrays with ndim of 4,
# we flatten batch_size and time_step into single dim.
dev_flat = dev.reshape(shape=(-1, 0, 0), reverse=1)
sigma_inv_flat = sigma_inv.reshape(shape=(-1, 0, 0), reverse=1)
dev_t_flat = dev_t.reshape(shape=(-1, 0, 0), reverse=1)
c = (1 / 2) * self.F.batch_dot(self.F.batch_dot(dev_t_flat, sigma_inv_flat), dev_flat)
# and now reshape back to (batch_size, time_step) if required.
c = c.reshape_like(b)
log_likelihood = -a - b - c
return log_likelihood
def entropy(self) -> nd_sym_type:
"""
Calculate entropy of current distribution.
See http://www.nowozin.net/sebastian/blog/the-entropy-of-a-normal-distribution.html
:return: entropy,
of shape (1) or
of shape (batch_size) or
of shape (batch_size, time_step).
"""
# todo: check if differential entropy is correct
log_det_sigma = self.log_det(self.sigma)
return (self.num_var / 2) + ((self.num_var / 2) * math.log(2 * math.pi)) + ((1 / 2) * log_det_sigma)
def kl_div(self, alt_dist) -> nd_sym_type:
"""
Calculated KL-Divergence with another MultivariateNormalDist distribution
See https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
Specifically https://wikimedia.org/api/rest_v1/media/math/render/svg/a3bf3b4917bd1fcb8be48d6d6139e2e387bdc7d3
:param alt_dist: alternative distribution used for kl divergence calculation
:type alt_dist: MultivariateNormalDist
:return: KL-Divergence, of shape (1,)
"""
sigma_a_inv = self.F.linalg.potri(self.F.linalg.potrf(self.sigma))
sigma_b_inv = self.F.linalg.potri(self.F.linalg.potrf(alt_dist.sigma))
term1a = mx.nd.batch_dot(sigma_b_inv, self.sigma)
# sum of diagonal for batch of matrices
term1 = (broadcast_like(self.F, self.F.eye(self.num_var), term1a) * term1a).sum(axis=-1).sum(axis=-1)
mean_diff = (alt_dist.mean - self.mean).expand_dims(-1)
mean_diff_t = (alt_dist.mean - self.mean).expand_dims(-2)
term2 = self.F.batch_dot(self.F.batch_dot(mean_diff_t, sigma_b_inv), mean_diff).reshape_like(term1)
term3 = (2 * self.F.linalg.sumlogdiag(self.F.linalg.potrf(alt_dist.sigma))) -\
(2 * self.F.linalg.sumlogdiag(self.F.linalg.potrf(self.sigma)))
return 0.5 * (term1 + term2 - self.num_var + term3)
class CategoricalDist:
def __init__(self, n_classes: int, probs: nd_sym_type, F: ModuleType=mx.nd) -> None:
"""
Distribution object for Categorical data.
Optionally works with batches and time steps, but be consistent in usage: i.e. if using time_step,
mean, sigma and data for log_prob must all include a time_step dimension.
:param n_classes: number of classes in distribution
:param probs: probabilities for each class,
of shape (n_classes),
of shape (batch_size, n_classes) or
of shape (batch_size, time_step, n_classes)
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
"""
self.n_classes = n_classes
self.probs = probs
self.F = F
def log_prob(self, actions: nd_sym_type) -> nd_sym_type:
"""
Calculate the log probability of data given the current distribution.
:param actions: actions, with int8 data type,
of shape (1) if probs was (n_classes),
of shape (batch_size) if probs was (batch_size, n_classes) and
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
:return: log_probability,
of shape (1) if probs was (n_classes),
of shape (batch_size) if probs was (batch_size, n_classes) and
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
"""
action_mask = actions.one_hot(depth=self.n_classes)
action_probs = (self.probs * action_mask).sum(axis=-1)
return action_probs.log()
def entropy(self) -> nd_sym_type:
"""
Calculate entropy of current distribution.
:return: entropy,
of shape (1) if probs was (n_classes),
of shape (batch_size) if probs was (batch_size, n_classes) and
of shape (batch_size, time_step) if probs was (batch_size, time_step, n_classes)
"""
# todo: look into numerical stability
return -(self.probs.log()*self.probs).sum(axis=-1)
def kl_div(self, alt_dist) -> nd_sym_type:
"""
Calculated KL-Divergence with another Categorical distribution
:param alt_dist: alternative distribution used for kl divergence calculation
:type alt_dist: CategoricalDist
:return: KL-Divergence
"""
logits_a = self.probs.clip(a_min=eps, a_max=1 - eps).log()
logits_b = alt_dist.probs.clip(a_min=eps, a_max=1 - eps).log()
t = self.probs * (logits_a - logits_b)
t = self.F.where(condition=(alt_dist.probs == 0), x=self.F.ones_like(alt_dist.probs) * math.inf, y=t)
t = self.F.where(condition=(self.probs == 0), x=self.F.zeros_like(self.probs), y=t)
return t.sum(axis=-1)
class DiscretePPOHead(nn.HybridBlock):
def __init__(self, num_actions: int) -> None:
"""
Head block for Discrete Proximal Policy Optimization, to calculate probabilities for each action given
middleware representation of the environment state.
:param num_actions: number of actions in action space.
"""
super(DiscretePPOHead, self).__init__()
with self.name_scope():
self.dense = nn.Dense(units=num_actions, flatten=False,
weight_initializer=NormalizedRSSInitializer(0.01))
def hybrid_forward(self, F: ModuleType, x: nd_sym_type) -> nd_sym_type:
"""
Used for forward pass through head network.
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
:param x: middleware state representation,
of shape (batch_size, in_channels) or
of shape (batch_size, time_step, in_channels).
:return: batch of probabilities for each action,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
"""
policy_values = self.dense(x)
policy_probs = F.softmax(policy_values)
return policy_probs
class ContinuousPPOHead(nn.HybridBlock):
def __init__(self, num_actions: int) -> None:
"""
Head block for Continuous Proximal Policy Optimization, to calculate probabilities for each action given
middleware representation of the environment state.
:param num_actions: number of actions in action space.
"""
super(ContinuousPPOHead, self).__init__()
with self.name_scope():
self.dense = nn.Dense(units=num_actions, flatten=False,
weight_initializer=NormalizedRSSInitializer(0.01))
# all samples (across batch, and time step) share the same covariance, which is learnt,
# but since we assume the action probability variables are independent,
# only the diagonal entries of the covariance matrix are specified.
self.log_std = self.params.get('log_std',
shape=(num_actions,),
init=mx.init.Zero(),
allow_deferred_init=True)
# todo: is_local?
def hybrid_forward(self, F: ModuleType, x: nd_sym_type, log_std: nd_sym_type) -> Tuple[nd_sym_type, nd_sym_type]:
"""
Used for forward pass through head network.
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
:param x: middleware state representation,
of shape (batch_size, in_channels) or
of shape (batch_size, time_step, in_channels).
:return: batch of probabilities for each action,
of shape (batch_size, action_mean) or
of shape (batch_size, time_step, action_mean).
"""
policy_means = self.dense(x)
policy_std = broadcast_like(F, log_std.exp().expand_dims(0), policy_means)
return policy_means, policy_std
class ClippedPPOLossDiscrete(HeadLoss):
def __init__(self,
num_actions: int,
clip_likelihood_ratio_using_epsilon: float,
beta: float=0,
use_kl_regularization: bool=False,
initial_kl_coefficient: float=1,
kl_cutoff: float=0,
high_kl_penalty_coefficient: float=1,
weight: float=1,
batch_axis: int=0) -> None:
"""
Loss for discrete version of Clipped PPO.
:param num_actions: number of actions in action space.
:param clip_likelihood_ratio_using_epsilon: epsilon to use for likelihood ratio clipping.
:param beta: loss coefficient applied to entropy
:param use_kl_regularization: option to add kl divergence loss
:param initial_kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
:param kl_cutoff: threshold for using high_kl_penalty_coefficient
:param high_kl_penalty_coefficient: loss coefficient applied to kv divergence above kl_cutoff
:param weight: scalar used to adjust relative weight of loss (if using this loss with others).
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
"""
super(ClippedPPOLossDiscrete, self).__init__(weight=weight, batch_axis=batch_axis)
self.weight = weight
self.num_actions = num_actions
self.clip_likelihood_ratio_using_epsilon = clip_likelihood_ratio_using_epsilon
self.beta = beta
self.use_kl_regularization = use_kl_regularization
self.initial_kl_coefficient = initial_kl_coefficient if self.use_kl_regularization else 0.0
self.kl_coefficient = self.params.get('kl_coefficient',
shape=(1,),
init=mx.init.Constant([initial_kl_coefficient,]),
differentiable=False)
self.kl_cutoff = kl_cutoff
self.high_kl_penalty_coefficient = high_kl_penalty_coefficient
@property
def input_schema(self) -> LossInputSchema:
return LossInputSchema(
head_outputs=['new_policy_probs'],
agent_inputs=['actions', 'old_policy_probs', 'clip_param_rescaler'],
targets=['advantages']
)
def loss_forward(self,
F: ModuleType,
new_policy_probs: nd_sym_type,
actions: nd_sym_type,
old_policy_probs: nd_sym_type,
clip_param_rescaler: nd_sym_type,
advantages: nd_sym_type,
kl_coefficient: nd_sym_type) -> List[Tuple[nd_sym_type, str]]:
"""
Used for forward pass through loss computations.
Works with batches of data, and optionally time_steps, but be consistent in usage: i.e. if using time_step,
new_policy_probs, old_policy_probs, actions and advantages all must include a time_step dimension.
NOTE: order of input arguments MUST NOT CHANGE because it matches the order
parameters are passed in ppo_agent:train_network()
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
:param new_policy_probs: action probabilities predicted by DiscretePPOHead network,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param old_policy_probs: action probabilities for previous policy,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param actions: true actions taken during rollout,
of shape (batch_size) or
of shape (batch_size, time_step).
:param clip_param_rescaler: scales epsilon to use for likelihood ratio clipping.
:param advantages: change in state value after taking action (a.k.a advantage)
of shape (batch_size) or
of shape (batch_size, time_step).
:param kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
:return: loss, of shape (batch_size).
"""
old_policy_dist = CategoricalDist(self.num_actions, old_policy_probs, F=F)
action_probs_wrt_old_policy = old_policy_dist.log_prob(actions)
new_policy_dist = CategoricalDist(self.num_actions, new_policy_probs, F=F)
action_probs_wrt_new_policy = new_policy_dist.log_prob(actions)
entropy_loss = - self.beta * new_policy_dist.entropy().mean()
if self.use_kl_regularization:
kl_div = old_policy_dist.kl_div(new_policy_dist).mean()
weighted_kl_div = kl_coefficient * kl_div
high_kl_div = F.stack(F.zeros_like(kl_div), kl_div - self.kl_cutoff).max().square()
weighted_high_kl_div = self.high_kl_penalty_coefficient * high_kl_div
kl_div_loss = weighted_kl_div + weighted_high_kl_div
else:
kl_div_loss = F.zeros(shape=(1,))
# working with log probs, so minus first, then exponential (same as division)
likelihood_ratio = (action_probs_wrt_new_policy - action_probs_wrt_old_policy).exp()
if self.clip_likelihood_ratio_using_epsilon is not None:
# clipping of likelihood ratio
min_value = 1 - self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
max_value = 1 + self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
# can't use F.clip (with variable clipping bounds), hence custom implementation
clipped_likelihood_ratio = hybrid_clip(F, likelihood_ratio, clip_lower=min_value, clip_upper=max_value)
# lower bound of original, and clipped versions or each scaled advantage
# element-wise min between the two ndarrays
unclipped_scaled_advantages = likelihood_ratio * advantages
clipped_scaled_advantages = clipped_likelihood_ratio * advantages
scaled_advantages = F.stack(unclipped_scaled_advantages, clipped_scaled_advantages).min(axis=0)
else:
scaled_advantages = likelihood_ratio * advantages
clipped_likelihood_ratio = F.zeros_like(likelihood_ratio)
# for each batch, calculate expectation of scaled_advantages across time steps,
# but want code to work with data without time step too, so reshape to add timestep if doesn't exist.
scaled_advantages_w_time = scaled_advantages.reshape(shape=(0, -1))
expected_scaled_advantages = scaled_advantages_w_time.mean(axis=1)
# want to maximize expected_scaled_advantages, add minus so can minimize.
surrogate_loss = (-expected_scaled_advantages * self.weight).mean()
return [
(surrogate_loss, LOSS_OUT_TYPE_LOSS),
(entropy_loss + kl_div_loss, LOSS_OUT_TYPE_REGULARIZATION),
(kl_div_loss, LOSS_OUT_TYPE_KL),
(entropy_loss, LOSS_OUT_TYPE_ENTROPY),
(likelihood_ratio, LOSS_OUT_TYPE_LIKELIHOOD_RATIO),
(clipped_likelihood_ratio, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO)
]
class ClippedPPOLossContinuous(HeadLoss):
def __init__(self,
num_actions: int,
clip_likelihood_ratio_using_epsilon: float,
beta: float=0,
use_kl_regularization: bool=False,
initial_kl_coefficient: float=1,
kl_cutoff: float=0,
high_kl_penalty_coefficient: float=1,
weight: float=1,
batch_axis: int=0):
"""
Loss for continuous version of Clipped PPO.
:param num_actions: number of actions in action space.
:param clip_likelihood_ratio_using_epsilon: epsilon to use for likelihood ratio clipping.
:param beta: loss coefficient applied to entropy
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
:param use_kl_regularization: option to add kl divergence loss
:param initial_kl_coefficient: initial loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
:param kl_cutoff: threshold for using high_kl_penalty_coefficient
:param high_kl_penalty_coefficient: loss coefficient applied to kv divergence above kl_cutoff
:param weight: scalar used to adjust relative weight of loss (if using this loss with others).
:param batch_axis: axis used for mini-batch (default is 0) and excluded from loss aggregation.
"""
super(ClippedPPOLossContinuous, self).__init__(weight=weight, batch_axis=batch_axis)
self.weight = weight
self.num_actions = num_actions
self.clip_likelihood_ratio_using_epsilon = clip_likelihood_ratio_using_epsilon
self.beta = beta
self.use_kl_regularization = use_kl_regularization
self.initial_kl_coefficient = initial_kl_coefficient if self.use_kl_regularization else 0.0
self.kl_coefficient = self.params.get('kl_coefficient',
shape=(1,),
init=mx.init.Constant([initial_kl_coefficient,]),
differentiable=False)
self.kl_cutoff = kl_cutoff
self.high_kl_penalty_coefficient = high_kl_penalty_coefficient
@property
def input_schema(self) -> LossInputSchema:
return LossInputSchema(
head_outputs=['new_policy_means','new_policy_stds'],
agent_inputs=['actions', 'old_policy_means', 'old_policy_stds', 'clip_param_rescaler'],
targets=['advantages']
)
def loss_forward(self,
F: ModuleType,
new_policy_means: nd_sym_type,
new_policy_stds: nd_sym_type,
actions: nd_sym_type,
old_policy_means: nd_sym_type,
old_policy_stds: nd_sym_type,
clip_param_rescaler: nd_sym_type,
advantages: nd_sym_type,
kl_coefficient: nd_sym_type) -> List[Tuple[nd_sym_type, str]]:
"""
Used for forward pass through loss computations.
Works with batches of data, and optionally time_steps, but be consistent in usage: i.e. if using time_step,
new_policy_means, old_policy_means, actions and advantages all must include a time_step dimension.
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
:param new_policy_means: action means predicted by MultivariateNormalDist network,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param new_policy_stds: action standard deviation returned by head,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param actions: true actions taken during rollout,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param old_policy_means: action means for previous policy,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param old_policy_stds: action standard deviation returned by head previously,
of shape (batch_size, num_actions) or
of shape (batch_size, time_step, num_actions).
:param clip_param_rescaler: scales epsilon to use for likelihood ratio clipping.
:param advantages: change in state value after taking action (a.k.a advantage)
of shape (batch_size,) or
of shape (batch_size, time_step).
:param kl_coefficient: loss coefficient applied kl divergence loss (also see high_kl_penalty_coefficient).
:return: loss, of shape (batch_size).
"""
def diagonal_covariance(stds, size):
vars = stds ** 2
# sets diagonal in (batch size and time step) covariance matrices
vars_tiled = vars.expand_dims(2).tile((1, 1, size))
covars = F.broadcast_mul(vars_tiled, F.eye(size))
return covars
old_covar = diagonal_covariance(stds=old_policy_stds, size=self.num_actions)
old_policy_dist = MultivariateNormalDist(self.num_actions, old_policy_means, old_covar, F=F)
action_probs_wrt_old_policy = old_policy_dist.log_prob(actions)
new_covar = diagonal_covariance(stds=new_policy_stds, size=self.num_actions)
new_policy_dist = MultivariateNormalDist(self.num_actions, new_policy_means, new_covar, F=F)
action_probs_wrt_new_policy = new_policy_dist.log_prob(actions)
entropy_loss = - self.beta * new_policy_dist.entropy().mean()
if self.use_kl_regularization:
kl_div = old_policy_dist.kl_div(new_policy_dist).mean()
weighted_kl_div = kl_coefficient * kl_div
high_kl_div = F.stack(F.zeros_like(kl_div), kl_div - self.kl_cutoff).max().square()
weighted_high_kl_div = self.high_kl_penalty_coefficient * high_kl_div
kl_div_loss = weighted_kl_div + weighted_high_kl_div
else:
kl_div_loss = F.zeros(shape=(1,))
# working with log probs, so minus first, then exponential (same as division)
likelihood_ratio = (action_probs_wrt_new_policy - action_probs_wrt_old_policy).exp()
if self.clip_likelihood_ratio_using_epsilon is not None:
# clipping of likelihood ratio
min_value = 1 - self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
max_value = 1 + self.clip_likelihood_ratio_using_epsilon * clip_param_rescaler
# can't use F.clip (with variable clipping bounds), hence custom implementation
clipped_likelihood_ratio = hybrid_clip(F, likelihood_ratio, clip_lower=min_value, clip_upper=max_value)
# lower bound of original, and clipped versions or each scaled advantage
# element-wise min between the two ndarrays
unclipped_scaled_advantages = likelihood_ratio * advantages
clipped_scaled_advantages = clipped_likelihood_ratio * advantages
scaled_advantages = F.stack(unclipped_scaled_advantages, clipped_scaled_advantages).min(axis=0)
else:
scaled_advantages = likelihood_ratio * advantages
clipped_likelihood_ratio = F.zeros_like(likelihood_ratio)
# for each batch, calculate expectation of scaled_advantages across time steps,
# but want code to work with data without time step too, so reshape to add timestep if doesn't exist.
scaled_advantages_w_time = scaled_advantages.reshape(shape=(0, -1))
expected_scaled_advantages = scaled_advantages_w_time.mean(axis=1)
# want to maximize expected_scaled_advantages, add minus so can minimize.
surrogate_loss = (-expected_scaled_advantages * self.weight).mean()
return [
(surrogate_loss, LOSS_OUT_TYPE_LOSS),
(entropy_loss + kl_div_loss, LOSS_OUT_TYPE_REGULARIZATION),
(kl_div_loss, LOSS_OUT_TYPE_KL),
(entropy_loss, LOSS_OUT_TYPE_ENTROPY),
(likelihood_ratio, LOSS_OUT_TYPE_LIKELIHOOD_RATIO),
(clipped_likelihood_ratio, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO)
]
class PPOHead(Head):
def __init__(self,
agent_parameters: AgentParameters,
spaces: SpacesDefinition,
network_name: str,
head_type_idx: int=0,
loss_weight: float=1.,
is_local: bool=True,
activation_function: str='tanh',
dense_layer: None=None) -> None:
"""
Head block for Proximal Policy Optimization, to calculate probabilities for each action given middleware
representation of the environment state.
:param agent_parameters: containing algorithm parameters such as clip_likelihood_ratio_using_epsilon
and beta_entropy.
:param spaces: containing action spaces used for defining size of network output.
:param network_name: name of head network. currently unused.
:param head_type_idx: index of head network. currently unused.
:param loss_weight: scalar used to adjust relative weight of loss (if using this loss with others).
:param is_local: flag to denote if network is local. currently unused.
:param activation_function: activation function to use between layers. currently unused.
:param dense_layer: type of dense layer to use in network. currently unused.
"""
super().__init__(agent_parameters, spaces, network_name, head_type_idx, loss_weight, is_local, activation_function,
dense_layer=dense_layer)
self.return_type = ActionProbabilities
self.clip_likelihood_ratio_using_epsilon = agent_parameters.algorithm.clip_likelihood_ratio_using_epsilon
self.beta = agent_parameters.algorithm.beta_entropy
self.use_kl_regularization = agent_parameters.algorithm.use_kl_regularization
if self.use_kl_regularization:
self.initial_kl_coefficient = agent_parameters.algorithm.initial_kl_coefficient
self.kl_cutoff = 2 * agent_parameters.algorithm.target_kl_divergence
self.high_kl_penalty_coefficient = agent_parameters.algorithm.high_kl_penalty_coefficient
else:
self.initial_kl_coefficient, self.kl_cutoff, self.high_kl_penalty_coefficient = (None, None, None)
self._loss = []
if isinstance(self.spaces.action, DiscreteActionSpace):
self.net = DiscretePPOHead(num_actions=len(self.spaces.action.actions))
elif isinstance(self.spaces.action, BoxActionSpace):
self.net = ContinuousPPOHead(num_actions=self.spaces.action.shape[0])
else:
raise ValueError("Only discrete or continuous action spaces are supported for PPO.")
def hybrid_forward(self,
F: ModuleType,
x: nd_sym_type) -> nd_sym_type:
"""
:param (mx.nd or mx.sym) F: backend api (mx.sym if block has been hybridized).
:param x: middleware embedding
:return: policy parameters/probabilities
"""
return self.net(x)
def loss(self) -> mx.gluon.loss.Loss:
"""
Specifies loss block to be used for this policy head.
:return: loss block (can be called as function) for action probabilities returned by this policy network.
"""
if isinstance(self.spaces.action, DiscreteActionSpace):
loss = ClippedPPOLossDiscrete(len(self.spaces.action.actions),
self.clip_likelihood_ratio_using_epsilon,
self.beta,
self.use_kl_regularization, self.initial_kl_coefficient,
self.kl_cutoff, self.high_kl_penalty_coefficient,
self.loss_weight)
elif isinstance(self.spaces.action, BoxActionSpace):
loss = ClippedPPOLossContinuous(self.spaces.action.shape[0],
self.clip_likelihood_ratio_using_epsilon,
self.beta,
self.use_kl_regularization, self.initial_kl_coefficient,
self.kl_cutoff, self.high_kl_penalty_coefficient,
self.loss_weight)
else:
raise ValueError("Only discrete or continuous action spaces are supported for PPO.")
loss.initialize()
# set a property so can assign_kl_coefficient in future,
# make a list, otherwise it would be added as a child of Head Block (due to type check)
self._loss = [loss]
return loss
@property
def kl_divergence(self):
return self.head_type_idx, LOSS_OUT_TYPE_KL
@property
def entropy(self):
return self.head_type_idx, LOSS_OUT_TYPE_ENTROPY
@property
def likelihood_ratio(self):
return self.head_type_idx, LOSS_OUT_TYPE_LIKELIHOOD_RATIO
@property
def clipped_likelihood_ratio(self):
return self.head_type_idx, LOSS_OUT_TYPE_CLIPPED_LIKELIHOOD_RATIO
def assign_kl_coefficient(self, kl_coefficient: float) -> None:
self._loss[0].kl_coefficient.set_data(mx.nd.array((kl_coefficient,)))