1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 19:20:19 +01:00
Files
coach/rl_coach/agents/agent.py
shadiendrawis 0896f43097 Robosuite exploration (#478)
* Add Robosuite parameters for all env types + initialize env flow

* Init flow done

* Rest of Environment API complete for RobosuiteEnvironment

* RobosuiteEnvironment changes

* Observation stacking filter
* Add proper frame_skip in addition to control_freq
* Hardcode Coach rendering to 'frontview' camera

* Robosuite_Lift_DDPG preset + Robosuite env updates

* Move observation stacking filter from env to preset
* Pre-process observation - concatenate depth map (if exists)
  to image and object state (if exists) to robot state
* Preset parameters based on Surreal DDPG parameters, taken from:
  https://github.com/SurrealAI/surreal/blob/master/surreal/main/ddpg_configs.py

* RobosuiteEnvironment fixes - working now with PyGame rendering

* Preset minor modifications

* ObservationStackingFilter - option to concat non-vector observations

* Consider frame skip when setting horizon in robosuite env

* Robosuite lift preset - update heatup length and training interval

* Robosuite env - change control_freq to 10 to match Surreal usage

* Robosuite clipped PPO preset

* Distribute multiple workers (-n #) over multiple GPUs

* Clipped PPO memory optimization from @shadiendrawis

* Fixes to evaluation only workers

* RoboSuite_ClippedPPO: Update training interval

* Undo last commit (update training interval)

* Fix "doube-negative" if conditions

* multi-agent single-trainer clipped ppo training with cartpole

* cleanups (not done yet) + ~tuned hyper-params for mast

* Switch to Robosuite v1 APIs

* Change presets to IK controller

* more cleanups + enabling evaluation worker + better logging

* RoboSuite_Lift_ClippedPPO updates

* Fix major bug in obs normalization filter setup

* Reduce coupling between Robosuite API and Coach environment

* Now only non task-specific parameters are explicitly defined
  in Coach
* Removed a bunch of enums of Robosuite elements, using simple
  strings instead
* With this change new environments/robots/controllers in Robosuite
  can be used immediately in Coach

* MAST: better logging of actor-trainer interaction + bug fixes + performance improvements.

Still missing: fixed pubsub for obs normalization running stats + logging for trainer signals

* lstm support for ppo

* setting JOINT VELOCITY action space by default + fix for EveryNEpisodes video dump filter + new TaskIDDumpFilter + allowing or between video dump filters

* Separate Robosuite clipped PPO preset for the non-MAST case

* Add flatten layer to architectures and use it in Robosuite presets

This is required for embedders that mix conv and dense

TODO: Add MXNet implementation

* publishing running_stats together with the published policy + hyper-param for when to publish a policy + cleanups

* bug-fix for memory leak in MAST

* Bugfix: Return value in TF BatchnormActivationDropout.to_tf_instance

* Explicit activations in embedder scheme so there's no ReLU after flatten

* Add clipped PPO heads with configurable dense layers at the beginning

* This is a workaround needed to mimic Surreal-PPO, where the CNN and
  LSTM are shared between actor and critic but the FC layers are not
  shared
* Added a "SchemeBuilder" class, currently only used for the new heads
  but we can change Middleware and Embedder implementations to use it
  as well

* Video dump setting fix in basic preset

* logging screen output to file

* coach to start the redis-server for a MAST run

* trainer drops off-policy data + old policy in ClippedPPO updates only after policy was published + logging free memory stats + actors check for a new policy only at the beginning of a new episode + fixed a bug where the trainer was logging "Training Reward = 0", causing dashboard to incorrectly display the signal

* Add missing set_internal_state function in TFSharedRunningStats

* Robosuite preset - use SingleLevelSelect instead of hard-coded level

* policy ID published directly on Redis

* Small fix when writing to log file

* Major bugfix in Robosuite presets - pass dense sizes to heads

* RoboSuite_Lift_ClippedPPO hyper-params update

* add horizon and value bootstrap to GAE calculation, fix A3C with LSTM

* adam hyper-params from mujoco

* updated MAST preset with IK_POSE_POS controller

* configurable initialization for policy stdev + custom extra noise per actor + logging of policy stdev to dashboard

* values loss weighting of 0.5

* minor fixes + presets

* bug-fix for MAST  where the old policy in the trainer had kept updating every training iter while it should only update after every policy publish

* bug-fix: reset_internal_state was not called by the trainer

* bug-fixes in the lstm flow + some hyper-param adjustments for CartPole_ClippedPPO_LSTM -> training and sometimes reaches 200

* adding back the horizon hyper-param - a messy commit

* another bug-fix missing from prev commit

* set control_freq=2 to match action_scale 0.125

* ClippedPPO with MAST cleanups and some preps for TD3 with MAST

* TD3 presets. RoboSuite_Lift_TD3 seems to work well with multi-process runs (-n 8)

* setting termination on collision to be on by default

* bug-fix following prev-prev commit

* initial cube exploration environment with TD3 commit

* bug fix + minor refactoring

* several parameter changes and RND debugging

* Robosuite Gym wrapper + Rename TD3_Random* -> Random*

* algorithm update

* Add RoboSuite v1 env + presets (to eventually replace non-v1 ones)

* Remove grasping presets, keep only V1 exp. presets (w/o V1 tag)

* Keep just robosuite V1 env as the 'robosuite_environment' module

* Exclude Robosuite and MAST presets from integration tests

* Exclude LSTM and MAST presets from golden tests

* Fix mistakenly removed import

* Revert debug changes in ReaderWriterLock

* Try another way to exclude LSTM/MAST golden tests

* Remove debug prints

* Remove PreDense heads, unused in the end

* Missed removing an instance of PreDense head

* Remove MAST, not required for this PR

* Undo unused concat option in ObservationStackingFilter

* Remove LSTM updates, not required in this PR

* Update README.md

* code changes for the exploration flow to work with robosuite master branch

* code cleanup + documentation

* jupyter tutorial for the goal-based exploration + scatter plot

* typo fix

* Update README.md

* seprate parameter for the obs-goal observation + small fixes

* code clarity fixes

* adjustment in tutorial 5

* Update tutorial

* Update tutorial

Co-authored-by: Guy Jacob <guy.jacob@intel.com>
Co-authored-by: Gal Leibovich <gal.leibovich@intel.com>
Co-authored-by: shadi.endrawis <sendrawi@aipg-ra-skx-03.ra.intel.com>
2021-06-01 00:34:19 +03:00

1090 lines
55 KiB
Python

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import random
from collections import OrderedDict
from typing import Dict, List, Union, Tuple
import numpy as np
from six.moves import range
from rl_coach.agents.agent_interface import AgentInterface
from rl_coach.architectures.network_wrapper import NetworkWrapper
from rl_coach.base_parameters import AgentParameters, Device, DeviceType, DistributedTaskParameters, Frameworks
from rl_coach.core_types import RunPhase, PredictionType, EnvironmentEpisodes, ActionType, Batch, Episode, StateType
from rl_coach.core_types import Transition, ActionInfo, TrainingSteps, EnvironmentSteps, EnvResponse
from rl_coach.logger import screen, Logger, EpisodeLogger
from rl_coach.memories.episodic.episodic_experience_replay import EpisodicExperienceReplay
from rl_coach.saver import SaverCollection
from rl_coach.spaces import SpacesDefinition, VectorObservationSpace, GoalsSpace, AttentionActionSpace
from rl_coach.utils import Signal, force_list
from rl_coach.utils import dynamic_import_and_instantiate_module_from_params
from rl_coach.memories.backend.memory_impl import get_memory_backend
from rl_coach.core_types import TimeTypes
from rl_coach.off_policy_evaluators.ope_manager import OpeManager
from rl_coach.core_types import PickledReplayBuffer, CsvDataset
class Agent(AgentInterface):
def __init__(self, agent_parameters: AgentParameters, parent: Union['LevelManager', 'CompositeAgent']=None):
"""
:param agent_parameters: A AgentParameters class instance with all the agent parameters
"""
super().__init__()
# use seed
if agent_parameters.task_parameters.seed is not None:
random.seed(agent_parameters.task_parameters.seed)
np.random.seed(agent_parameters.task_parameters.seed)
else:
# we need to seed the RNG since the different processes are initialized with the same parent seed
random.seed()
np.random.seed()
self.ap = agent_parameters
self.task_id = self.ap.task_parameters.task_index
self.is_chief = self.task_id == 0
self.shared_memory = type(agent_parameters.task_parameters) == DistributedTaskParameters \
and self.ap.memory.shared_memory
if self.shared_memory:
self.shared_memory_scratchpad = self.ap.task_parameters.shared_memory_scratchpad
self.parent = parent
self.parent_level_manager = None
# TODO this needs to be sorted out. Why the duplicates for the agent's name?
self.full_name_id = agent_parameters.full_name_id = self.name = agent_parameters.name
if type(agent_parameters.task_parameters) == DistributedTaskParameters:
screen.log_title("Creating agent - name: {} task id: {} (may take up to 30 seconds due to "
"tensorflow wake up time)".format(self.full_name_id, self.task_id))
else:
screen.log_title("Creating agent - name: {}".format(self.full_name_id))
self.imitation = False
self.agent_logger = Logger()
self.agent_episode_logger = EpisodeLogger()
# get the memory
# - distributed training + shared memory:
# * is chief? -> create the memory and add it to the scratchpad
# * not chief? -> wait for the chief to create the memory and then fetch it
# - non distributed training / not shared memory:
# * create memory
memory_name = self.ap.memory.path.split(':')[1]
self.memory_lookup_name = self.full_name_id + '.' + memory_name
if self.shared_memory and not self.is_chief:
self.memory = self.shared_memory_scratchpad.get(self.memory_lookup_name)
else:
# modules
self.memory = dynamic_import_and_instantiate_module_from_params(self.ap.memory)
if hasattr(self.ap.memory, 'memory_backend_params'):
self.memory_backend = get_memory_backend(self.ap.memory.memory_backend_params)
if self.ap.memory.memory_backend_params.run_type != 'trainer':
self.memory.set_memory_backend(self.memory_backend)
if self.shared_memory and self.is_chief:
self.shared_memory_scratchpad.add(self.memory_lookup_name, self.memory)
# set devices
if type(agent_parameters.task_parameters) == DistributedTaskParameters:
self.has_global = True
self.replicated_device = agent_parameters.task_parameters.device
self.worker_device = "/job:worker/task:{}".format(self.task_id)
if agent_parameters.task_parameters.use_cpu:
self.worker_device += "/cpu:0"
else:
self.worker_device += "/device:GPU:0"
else:
self.has_global = False
self.replicated_device = None
if agent_parameters.task_parameters.use_cpu:
self.worker_device = Device(DeviceType.CPU)
else:
self.worker_device = [Device(DeviceType.GPU, i)
for i in range(agent_parameters.task_parameters.num_gpu)]
# filters
self.input_filter = self.ap.input_filter
self.input_filter.set_name('input_filter')
self.output_filter = self.ap.output_filter
self.output_filter.set_name('output_filter')
self.pre_network_filter = self.ap.pre_network_filter
self.pre_network_filter.set_name('pre_network_filter')
device = self.replicated_device if self.replicated_device else self.worker_device
# TODO-REMOVE This is a temporary flow dividing to 3 modes. To be converged to a single flow once distributed tf
# is removed, and Redis is used for sharing data between local workers.
# Filters MoW will be split between different configurations
# 1. Distributed coach synchrnization type (=distributed across multiple nodes) - Redis based data sharing + numpy arithmetic backend
# 2. Distributed TF (=distributed on a single node, using distributed TF) - TF for both data sharing and arithmetic backend
# 3. Single worker (=both TF and Mxnet) - no data sharing needed + numpy arithmetic backend
if hasattr(self.ap.memory, 'memory_backend_params') and self.ap.algorithm.distributed_coach_synchronization_type:
self.input_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy')
self.output_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy')
self.pre_network_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy')
elif (type(agent_parameters.task_parameters) == DistributedTaskParameters and
agent_parameters.task_parameters.framework_type == Frameworks.tensorflow):
self.input_filter.set_device(device, mode='tf')
self.output_filter.set_device(device, mode='tf')
self.pre_network_filter.set_device(device, mode='tf')
else:
self.input_filter.set_device(device, mode='numpy')
self.output_filter.set_device(device, mode='numpy')
self.pre_network_filter.set_device(device, mode='numpy')
# initialize all internal variables
self._phase = RunPhase.HEATUP
self.total_shaped_reward_in_current_episode = 0
self.total_reward_in_current_episode = 0
self.total_steps_counter = 0
self.running_reward = None
self.training_iteration = 0
self.training_epoch = 0
self.last_target_network_update_step = 0
self.last_training_phase_step = 0
self.current_episode = self.ap.current_episode = 0
self.curr_state = {}
self.current_hrl_goal = None
self.current_episode_steps_counter = 0
self.episode_running_info = {}
self.last_episode_evaluation_ran = 0
self.running_observations = []
self.agent_logger.set_current_time(self.current_episode)
self.exploration_policy = None
self.networks = {}
self.last_action_info = None
self.running_observation_stats = None
self.running_reward_stats = None
self.accumulated_rewards_across_evaluation_episodes = 0
self.accumulated_shaped_rewards_across_evaluation_episodes = 0
self.num_successes_across_evaluation_episodes = 0
self.num_evaluation_episodes_completed = 0
self.current_episode_buffer = Episode(discount=self.ap.algorithm.discount, n_step=self.ap.algorithm.n_step)
# TODO: add agents observation rendering for debugging purposes (not the same as the environment rendering)
# environment parameters
self.spaces = None
self.in_action_space = self.ap.algorithm.in_action_space
# signals
self.episode_signals = []
self.step_signals = []
self.loss = self.register_signal('Loss')
self.curr_learning_rate = self.register_signal('Learning Rate')
self.unclipped_grads = self.register_signal('Grads (unclipped)')
self.reward = self.register_signal('Reward', dump_one_value_per_episode=False, dump_one_value_per_step=True)
self.shaped_reward = self.register_signal('Shaped Reward', dump_one_value_per_episode=False, dump_one_value_per_step=True)
self.discounted_return = self.register_signal('Discounted Return')
if isinstance(self.in_action_space, GoalsSpace):
self.distance_from_goal = self.register_signal('Distance From Goal', dump_one_value_per_step=True)
# batch rl
self.ope_manager = OpeManager() if self.ap.is_batch_rl_training else None
@property
def parent(self) -> 'LevelManager':
"""
Get the parent class of the agent
:return: the current phase
"""
return self._parent
@parent.setter
def parent(self, val) -> None:
"""
Change the parent class of the agent.
Additionally, updates the full name of the agent
:param val: the new parent
:return: None
"""
self._parent = val
if self._parent is not None:
if not hasattr(self._parent, 'name'):
raise ValueError("The parent of an agent must have a name")
self.full_name_id = self.ap.full_name_id = "{}/{}".format(self._parent.name, self.name)
def setup_logger(self) -> None:
"""
Setup the logger for the agent
:return: None
"""
# dump documentation
logger_prefix = "{graph_name}.{level_name}.{agent_full_id}".\
format(graph_name=self.parent_level_manager.parent_graph_manager.name,
level_name=self.parent_level_manager.name,
agent_full_id='.'.join(self.full_name_id.split('/')))
self.agent_logger.set_index_name(self.parent_level_manager.parent_graph_manager.time_metric.value.name)
self.agent_logger.set_logger_filenames(self.ap.task_parameters.experiment_path, logger_prefix=logger_prefix,
add_timestamp=True, task_id=self.task_id)
if self.ap.visualization.dump_in_episode_signals:
self.agent_episode_logger.set_logger_filenames(self.ap.task_parameters.experiment_path,
logger_prefix=logger_prefix,
add_timestamp=True, task_id=self.task_id)
def set_session(self, sess) -> None:
"""
Set the deep learning framework session for all the agents in the composite agent
:return: None
"""
self.input_filter.set_session(sess)
self.output_filter.set_session(sess)
self.pre_network_filter.set_session(sess)
[network.set_session(sess) for network in self.networks.values()]
self.initialize_session_dependent_components()
def initialize_session_dependent_components(self):
"""
Initialize components which require a session as part of their initialization.
:return: None
"""
# Loading a memory from a CSV file, requires an input filter to filter through the data.
# The filter needs a session before it can be used.
if self.ap.memory.load_memory_from_file_path:
self.load_memory_from_file()
def load_memory_from_file(self):
"""
Load memory transitions from a file.
:return: None
"""
if isinstance(self.ap.memory.load_memory_from_file_path, PickledReplayBuffer):
screen.log_title("Loading a pickled replay buffer. Pickled file path: {}"
.format(self.ap.memory.load_memory_from_file_path.filepath))
self.memory.load_pickled(self.ap.memory.load_memory_from_file_path.filepath)
elif isinstance(self.ap.memory.load_memory_from_file_path, CsvDataset):
screen.log_title("Loading a replay buffer from a CSV file. CSV file path: {}"
.format(self.ap.memory.load_memory_from_file_path.filepath))
self.memory.load_csv(self.ap.memory.load_memory_from_file_path, self.input_filter)
else:
raise ValueError('Trying to load a replay buffer using an unsupported method - {}. '
.format(self.ap.memory.load_memory_from_file_path))
def register_signal(self, signal_name: str, dump_one_value_per_episode: bool=True,
dump_one_value_per_step: bool=False) -> Signal:
"""
Register a signal such that its statistics will be dumped and be viewable through dashboard
:param signal_name: the name of the signal as it will appear in dashboard
:param dump_one_value_per_episode: should the signal value be written for each episode?
:param dump_one_value_per_step: should the signal value be written for each step?
:return: the created signal
"""
signal = Signal(signal_name)
if dump_one_value_per_episode:
self.episode_signals.append(signal)
if dump_one_value_per_step:
self.step_signals.append(signal)
return signal
def set_environment_parameters(self, spaces: SpacesDefinition):
"""
Sets the parameters that are environment dependent. As a side effect, initializes all the components that are
dependent on those values, by calling init_environment_dependent_modules
:param spaces: the environment spaces definition
:return: None
"""
self.spaces = copy.deepcopy(spaces)
if self.ap.algorithm.use_accumulated_reward_as_measurement:
if 'measurements' in self.spaces.state.sub_spaces:
self.spaces.state['measurements'].shape += 1
self.spaces.state['measurements'].measurements_names += ['accumulated_reward']
else:
self.spaces.state['measurements'] = VectorObservationSpace(1, measurements_names=['accumulated_reward'])
for observation_name in self.spaces.state.sub_spaces.keys():
self.spaces.state[observation_name] = \
self.pre_network_filter.get_filtered_observation_space(observation_name,
self.input_filter.get_filtered_observation_space(observation_name,
self.spaces.state[observation_name]))
self.spaces.reward = self.pre_network_filter.get_filtered_reward_space(
self.input_filter.get_filtered_reward_space(self.spaces.reward))
self.spaces.action = self.output_filter.get_unfiltered_action_space(self.spaces.action)
if isinstance(self.in_action_space, GoalsSpace):
# TODO: what if the goal type is an embedding / embedding change?
self.spaces.goal = self.in_action_space
self.spaces.goal.set_target_space(self.spaces.state[self.spaces.goal.goal_name])
self.init_environment_dependent_modules()
def create_networks(self) -> Dict[str, NetworkWrapper]:
"""
Create all the networks of the agent.
The network creation will be done after setting the environment parameters for the agent, since they are needed
for creating the network.
:return: A list containing all the networks
"""
networks = {}
for network_name in sorted(self.ap.network_wrappers.keys()):
networks[network_name] = NetworkWrapper(name=network_name,
agent_parameters=self.ap,
has_target=self.ap.network_wrappers[network_name].create_target_network,
has_global=self.has_global,
spaces=self.spaces,
replicated_device=self.replicated_device,
worker_device=self.worker_device)
if self.ap.visualization.print_networks_summary:
screen.print(networks[network_name])
return networks
def init_environment_dependent_modules(self) -> None:
"""
Initialize any modules that depend on knowing information about the environment such as the action space or
the observation space
:return: None
"""
# initialize exploration policy
if isinstance(self.ap.exploration, dict):
if self.spaces.action.__class__ in self.ap.exploration.keys():
self.ap.exploration = self.ap.exploration[self.spaces.action.__class__]
else:
raise ValueError("The exploration parameters were defined as a mapping between action space types and "
"exploration types, but the action space used by the environment ({}) was not part of "
"the exploration parameters dictionary keys ({})"
.format(self.spaces.action.__class__, list(self.ap.exploration.keys())))
self.ap.exploration.action_space = self.spaces.action
self.exploration_policy = dynamic_import_and_instantiate_module_from_params(self.ap.exploration)
# create all the networks of the agent
self.networks = self.create_networks()
@property
def phase(self) -> RunPhase:
"""
The current running phase of the agent
:return: RunPhase
"""
return self._phase
@phase.setter
def phase(self, val: RunPhase) -> None:
"""
Change the phase of the run for the agent and all the sub components
:param val: the new run phase (TRAIN, TEST, etc.)
:return: None
"""
self.reset_evaluation_state(val)
self._phase = val
self.exploration_policy.change_phase(val)
def reset_evaluation_state(self, val: RunPhase) -> None:
"""
Perform accumulators initialization when entering an evaluation phase, and signal dumping when exiting an
evaluation phase. Entering or exiting the evaluation phase is determined according to the new phase given
by val, and by the current phase set in self.phase.
:param val: The new phase to change to
:return: None
"""
starting_evaluation = (val == RunPhase.TEST)
ending_evaluation = (self.phase == RunPhase.TEST)
if starting_evaluation:
self.accumulated_rewards_across_evaluation_episodes = 0
self.accumulated_shaped_rewards_across_evaluation_episodes = 0
self.num_successes_across_evaluation_episodes = 0
self.num_evaluation_episodes_completed = 0
if self.ap.task_parameters.evaluate_only is None:
# TODO verbosity was mistakenly removed from task_parameters on release 0.11.0, need to bring it back
# if self.ap.is_a_highest_level_agent or self.ap.task_parameters.verbosity == "high":
if self.ap.is_a_highest_level_agent:
screen.log_title("{}: Starting evaluation phase".format(self.name))
elif ending_evaluation:
# we write to the next episode, because it could be that the current episode was already written
# to disk and then we won't write it again
self.agent_logger.set_current_time(self.get_current_time() + 1)
evaluation_reward = self.accumulated_rewards_across_evaluation_episodes / self.num_evaluation_episodes_completed
self.agent_logger.create_signal_value(
'Evaluation Reward', evaluation_reward)
self.agent_logger.create_signal_value(
'Shaped Evaluation Reward',
self.accumulated_shaped_rewards_across_evaluation_episodes / self.num_evaluation_episodes_completed)
success_rate = self.num_successes_across_evaluation_episodes / self.num_evaluation_episodes_completed
self.agent_logger.create_signal_value(
"Success Rate",
success_rate)
if self.ap.task_parameters.evaluate_only is None:
# TODO verbosity was mistakenly removed from task_parameters on release 0.11.0, need to bring it back
# if self.ap.is_a_highest_level_agent or self.ap.task_parameters.verbosity == "high":
if self.ap.is_a_highest_level_agent:
screen.log_title("{}: Finished evaluation phase. Success rate = {}, Avg Total Reward = {}"
.format(self.name, np.round(success_rate, 2), np.round(evaluation_reward, 2)))
def call_memory(self, func, args=()):
"""
This function is a wrapper to allow having the same calls for shared or unshared memories.
It should be used instead of calling the memory directly in order to allow different algorithms to work
both with a shared and a local memory.
:param func: the name of the memory function to call
:param args: the arguments to supply to the function
:return: the return value of the function
"""
if self.shared_memory:
result = self.shared_memory_scratchpad.internal_call(self.memory_lookup_name, func, args)
else:
if type(args) != tuple:
args = (args,)
result = getattr(self.memory, func)(*args)
return result
def log_to_screen(self) -> None:
"""
Write an episode summary line to the terminal
:return: None
"""
# log to screen
log = OrderedDict()
log["Name"] = self.full_name_id
if self.task_id is not None:
log["Worker"] = self.task_id
log["Episode"] = self.current_episode
log["Total reward"] = np.round(self.total_reward_in_current_episode, 2)
log["Exploration"] = np.round(self.exploration_policy.get_control_param(), 2)
log["Steps"] = self.total_steps_counter
log["Training iteration"] = self.training_iteration
screen.log_dict(log, prefix=self.phase.value)
def update_step_in_episode_log(self) -> None:
"""
Updates the in-episode log file with all the signal values from the most recent step.
:return: None
"""
# log all the signals to file
self.agent_episode_logger.set_current_time(self.current_episode_steps_counter)
self.agent_episode_logger.create_signal_value('Training Iter', self.training_iteration)
self.agent_episode_logger.create_signal_value('In Heatup', int(self._phase == RunPhase.HEATUP))
self.agent_episode_logger.create_signal_value('ER #Transitions', self.call_memory('num_transitions'))
self.agent_episode_logger.create_signal_value('ER #Episodes', self.call_memory('length'))
self.agent_episode_logger.create_signal_value('Total steps', self.total_steps_counter)
self.agent_episode_logger.create_signal_value("Epsilon", self.exploration_policy.get_control_param())
self.agent_episode_logger.create_signal_value("Shaped Accumulated Reward", self.total_shaped_reward_in_current_episode)
self.agent_episode_logger.create_signal_value('Update Target Network', 0, overwrite=False)
self.agent_episode_logger.update_wall_clock_time(self.current_episode_steps_counter)
for signal in self.step_signals:
self.agent_episode_logger.create_signal_value(signal.name, signal.get_last_value())
# dump
self.agent_episode_logger.dump_output_csv()
def update_log(self) -> None:
"""
Updates the episodic log file with all the signal values from the most recent episode.
Additional signals for logging can be set by the creating a new signal using self.register_signal,
and then updating it with some internal agent values.
:return: None
"""
# log all the signals to file
current_time = self.get_current_time()
self.agent_logger.set_current_time(current_time)
self.agent_logger.create_signal_value('Training Iter', self.training_iteration)
self.agent_logger.create_signal_value('Episode #', self.current_episode)
self.agent_logger.create_signal_value('Epoch', self.training_epoch)
self.agent_logger.create_signal_value('In Heatup', int(self._phase == RunPhase.HEATUP))
self.agent_logger.create_signal_value('ER #Transitions', self.call_memory('num_transitions'))
self.agent_logger.create_signal_value('ER #Episodes', self.call_memory('length'))
self.agent_logger.create_signal_value('Episode Length', self.current_episode_steps_counter)
self.agent_logger.create_signal_value('Total steps', self.total_steps_counter)
self.agent_logger.create_signal_value("Epsilon", np.mean(self.exploration_policy.get_control_param()))
self.agent_logger.create_signal_value("Shaped Training Reward", self.total_shaped_reward_in_current_episode
if self._phase == RunPhase.TRAIN else np.nan)
self.agent_logger.create_signal_value("Training Reward", self.total_reward_in_current_episode
if self._phase == RunPhase.TRAIN else np.nan)
self.agent_logger.create_signal_value('Update Target Network', 0, overwrite=False)
self.agent_logger.update_wall_clock_time(current_time)
# The following signals are created with meaningful values only when an evaluation phase is completed.
# Creating with default NaNs for any HEATUP/TRAIN/TEST episode which is not the last in an evaluation phase
self.agent_logger.create_signal_value('Evaluation Reward', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Shaped Evaluation Reward', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Success Rate', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Inverse Propensity Score', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Direct Method Reward', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Doubly Robust', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Weighted Importance Sampling', np.nan, overwrite=False)
self.agent_logger.create_signal_value('Sequential Doubly Robust', np.nan, overwrite=False)
for signal in self.episode_signals:
self.agent_logger.create_signal_value("{}/Mean".format(signal.name), signal.get_mean())
self.agent_logger.create_signal_value("{}/Stdev".format(signal.name), signal.get_stdev())
self.agent_logger.create_signal_value("{}/Max".format(signal.name), signal.get_max())
self.agent_logger.create_signal_value("{}/Min".format(signal.name), signal.get_min())
# dump
if self.current_episode % self.ap.visualization.dump_signals_to_csv_every_x_episodes == 0:
self.agent_logger.dump_output_csv()
def handle_episode_ended(self) -> None:
"""
Make any changes needed when each episode is ended.
This includes incrementing counters, updating full episode dependent values, updating logs, etc.
This function is called right after each episode is ended.
:return: None
"""
self.current_episode_buffer.is_complete = True
self.current_episode_buffer.update_transitions_rewards_and_bootstrap_data()
for transition in self.current_episode_buffer.transitions:
self.discounted_return.add_sample(transition.n_step_discounted_rewards)
if self.phase != RunPhase.TEST or self.ap.task_parameters.evaluate_only is not None:
self.current_episode += 1
if self.phase != RunPhase.TEST:
if isinstance(self.memory, EpisodicExperienceReplay):
if self.ap.algorithm.override_episode_rewards_with_the_last_transition_reward:
for t in self.current_episode_buffer.transitions:
t.reward = self.current_episode_buffer.transitions[-1].reward
self.call_memory('store_episode', self.current_episode_buffer)
elif self.ap.algorithm.store_transitions_only_when_episodes_are_terminated:
for transition in self.current_episode_buffer.transitions:
self.call_memory('store', transition)
if self.phase == RunPhase.TEST:
self.accumulated_rewards_across_evaluation_episodes += self.total_reward_in_current_episode
self.accumulated_shaped_rewards_across_evaluation_episodes += self.total_shaped_reward_in_current_episode
self.num_evaluation_episodes_completed += 1
if self.spaces.reward.reward_success_threshold and \
self.total_reward_in_current_episode >= self.spaces.reward.reward_success_threshold:
self.num_successes_across_evaluation_episodes += 1
if self.ap.visualization.dump_csv and \
self.parent_level_manager.parent_graph_manager.time_metric == TimeTypes.EpisodeNumber:
self.update_log()
# TODO verbosity was mistakenly removed from task_parameters on release 0.11.0, need to bring it back
# if self.ap.is_a_highest_level_agent or self.ap.task_parameters.verbosity == "high":
if self.ap.is_a_highest_level_agent:
self.log_to_screen()
def reset_internal_state(self) -> None:
"""
Reset all the episodic parameters. This function is called right before each episode starts.
:return: None
"""
for signal in self.episode_signals:
signal.reset()
for signal in self.step_signals:
signal.reset()
self.agent_episode_logger.set_episode_idx(self.current_episode)
self.total_shaped_reward_in_current_episode = 0
self.total_reward_in_current_episode = 0
self.curr_state = {}
self.current_episode_steps_counter = 0
self.episode_running_info = {}
self.current_episode_buffer = Episode(discount=self.ap.algorithm.discount, n_step=self.ap.algorithm.n_step)
if self.exploration_policy:
self.exploration_policy.reset()
self.input_filter.reset()
self.output_filter.reset()
self.pre_network_filter.reset()
if isinstance(self.memory, EpisodicExperienceReplay):
self.call_memory('verify_last_episode_is_closed')
for network in self.networks.values():
network.online_network.reset_internal_memory()
def learn_from_batch(self, batch) -> Tuple[float, List, List]:
"""
Given a batch of transitions, calculates their target values and updates the network.
:param batch: A list of transitions
:return: The total loss of the training, the loss per head and the unclipped gradients
"""
return 0, [], []
def _should_update_online_weights_to_target(self):
"""
Determine if online weights should be copied to the target.
:return: boolean: True if the online weights should be copied to the target.
"""
# update the target network of every network that has a target network
step_method = self.ap.algorithm.num_steps_between_copying_online_weights_to_target
if step_method.__class__ == TrainingSteps:
should_update = (self.training_iteration - self.last_target_network_update_step) >= step_method.num_steps
if should_update:
self.last_target_network_update_step = self.training_iteration
elif step_method.__class__ == EnvironmentSteps:
should_update = (self.total_steps_counter - self.last_target_network_update_step) >= step_method.num_steps
if should_update:
self.last_target_network_update_step = self.total_steps_counter
else:
raise ValueError("The num_steps_between_copying_online_weights_to_target parameter should be either "
"EnvironmentSteps or TrainingSteps. Instead it is {}".format(step_method.__class__))
return should_update
def _should_train(self):
"""
Determine if we should start a training phase according to the number of steps passed since the last training
:return: boolean: True if we should start a training phase
"""
should_update = self._should_update()
steps = self.ap.algorithm.num_consecutive_playing_steps
if should_update:
if steps.__class__ == EnvironmentEpisodes:
self.last_training_phase_step = self.current_episode
if steps.__class__ == EnvironmentSteps:
self.last_training_phase_step = self.total_steps_counter
return should_update
def _should_update(self):
wait_for_full_episode = self.ap.algorithm.act_for_full_episodes
steps = self.ap.algorithm.num_consecutive_playing_steps
if steps.__class__ == EnvironmentEpisodes:
should_update = (self.current_episode - self.last_training_phase_step) >= steps.num_steps
should_update = should_update and self.call_memory('length') > 0
elif steps.__class__ == EnvironmentSteps:
should_update = (self.total_steps_counter - self.last_training_phase_step) >= steps.num_steps
should_update = should_update and self.call_memory('num_transitions') > 0
if wait_for_full_episode:
should_update = should_update and self.current_episode_buffer.is_complete
else:
raise ValueError("The num_consecutive_playing_steps parameter should be either "
"EnvironmentSteps or Episodes. Instead it is {}".format(steps.__class__))
return should_update
def train(self) -> float:
"""
Check if a training phase should be done as configured by num_consecutive_playing_steps.
If it should, then do several training steps as configured by num_consecutive_training_steps.
A single training iteration: Sample a batch, train on it and update target networks.
:return: The total training loss during the training iterations.
"""
loss = 0
if self._should_train():
if self.ap.is_batch_rl_training:
# when training an agent for generating a dataset in batch-rl, we don't want it to be counted as part of
# the training epochs. we only care for training epochs in batch-rl anyway.
self.training_epoch += 1
for network in self.networks.values():
network.set_is_training(True)
# At the moment we only support a single batch size for all the networks
networks_parameters = list(self.ap.network_wrappers.values())
assert all(net.batch_size == networks_parameters[0].batch_size for net in networks_parameters)
batch_size = networks_parameters[0].batch_size
# we either go sequentially through the entire replay buffer in the batch RL mode,
# or sample randomly for the basic RL case.
training_schedule = self.call_memory('get_shuffled_training_data_generator', batch_size) if \
self.ap.is_batch_rl_training else [self.call_memory('sample', batch_size) for _ in
range(self.ap.algorithm.num_consecutive_training_steps)]
for batch in training_schedule:
# update counters
self.training_iteration += 1
if self.pre_network_filter is not None:
update_internal_state = self.ap.algorithm.update_pre_network_filters_state_on_train
batch = self.pre_network_filter.filter(batch, update_internal_state=update_internal_state, deep_copy=False)
# if the batch returned empty then there are not enough samples in the replay buffer -> skip
# training step
if len(batch) > 0:
# train
batch = Batch(batch)
total_loss, losses, unclipped_grads = self.learn_from_batch(batch)
loss += total_loss
self.unclipped_grads.add_sample(unclipped_grads)
# TODO: this only deals with the main network (if exists), need to do the same for other networks
# for instance, for DDPG, the LR signal is currently not shown. Probably should be done through the
# network directly instead of here
# decay learning rate
if 'main' in self.ap.network_wrappers and \
self.ap.network_wrappers['main'].learning_rate_decay_rate != 0:
self.curr_learning_rate.add_sample(self.networks['main'].sess.run(
self.networks['main'].online_network.current_learning_rate))
else:
self.curr_learning_rate.add_sample(networks_parameters[0].learning_rate)
if any([network.has_target for network in self.networks.values()]) \
and self._should_update_online_weights_to_target():
for network in self.networks.values():
network.update_target_network(self.ap.algorithm.rate_for_copying_weights_to_target)
self.agent_logger.create_signal_value('Update Target Network', 1)
else:
self.agent_logger.create_signal_value('Update Target Network', 0, overwrite=False)
self.loss.add_sample(loss)
if self.imitation:
self.log_to_screen()
if self.ap.visualization.dump_csv and \
self.parent_level_manager.parent_graph_manager.time_metric == TimeTypes.Epoch:
# in BatchRL, or imitation learning, the agent never acts, so we have to get the stats out here.
# we dump the data out every epoch
self.update_log()
for network in self.networks.values():
network.set_is_training(False)
# run additional commands after the training is done
self.post_training_commands()
return loss
def choose_action(self, curr_state):
"""
choose an action to act with in the current episode being played. Different behavior might be exhibited when
training or testing.
:param curr_state: the current state to act upon.
:return: chosen action, some action value describing the action (q-value, probability, etc)
"""
pass
def prepare_batch_for_inference(self, states: Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]],
network_name: str) -> Dict[str, np.array]:
"""
Convert curr_state into input tensors tensorflow is expecting. i.e. if we have several inputs states, stack all
observations together, measurements together, etc.
:param states: A list of environment states, where each one is a dict mapping from an observation name to its
corresponding observation
:param network_name: The agent network name to prepare the batch for. this is needed in order to extract only
the observation relevant for the network from the states.
:return: A dictionary containing a list of values from all the given states for each of the observations
"""
# convert to batch so we can run it through the network
states = force_list(states)
batches_dict = {}
for key in self.ap.network_wrappers[network_name].input_embedders_parameters.keys():
# there are cases (e.g. ddpg) where the state does not contain all the information needed for running
# through the network and this has to be added externally (e.g. ddpg where the action needs to be given in
# addition to the current_state, so that all the inputs of the network will be filled)
if key in states[0].keys():
batches_dict[key] = np.array([np.array(state[key]) for state in states])
return batches_dict
def act(self, action: Union[None, ActionType]=None) -> ActionInfo:
"""
Given the agents current knowledge, decide on the next action to apply to the environment
:param action: An action to take, overriding whatever the current policy is
:return: An ActionInfo object, which contains the action and any additional info from the action decision process
"""
if self.phase == RunPhase.TRAIN and self.ap.algorithm.num_consecutive_playing_steps.num_steps == 0:
# This agent never plays while training (e.g. behavioral cloning)
return None
# count steps (only when training or if we are in the evaluation worker)
if self.phase != RunPhase.TEST or self.ap.task_parameters.evaluate_only is not None:
self.total_steps_counter += 1
self.current_episode_steps_counter += 1
# decide on the action
if action is None:
if self.phase == RunPhase.HEATUP and not self.ap.algorithm.heatup_using_network_decisions:
# random action
action = self.spaces.action.sample_with_info()
else:
# informed action
if self.pre_network_filter is not None:
# before choosing an action, first use the pre_network_filter to filter out the current state
update_filter_internal_state = self.ap.algorithm.update_pre_network_filters_state_on_inference and \
self.phase is not RunPhase.TEST
curr_state = self.run_pre_network_filter_for_inference(self.curr_state, update_filter_internal_state)
else:
curr_state = self.curr_state
action = self.choose_action(curr_state)
assert isinstance(action, ActionInfo)
self.last_action_info = action
# output filters are explicitly applied after recording self.last_action_info. This is
# because the output filters may change the representation of the action so that the agent
# can no longer use the transition in it's replay buffer. It is possible that these filters
# could be moved to the environment instead, but they are here now for historical reasons.
filtered_action_info = self.output_filter.filter(self.last_action_info)
return filtered_action_info
def run_pre_network_filter_for_inference(self, state: StateType, update_filter_internal_state: bool=True)\
-> StateType:
"""
Run filters which where defined for being applied right before using the state for inference.
:param state: The state to run the filters on
:param update_filter_internal_state: Should update the filter's internal state - should not update when evaluating
:return: The filtered state
"""
dummy_env_response = EnvResponse(next_state=state, reward=0, game_over=False)
# TODO actually we only want to run the observation filters. No point in running the reward filters as the
# filtered reward is being ignored anyway (and it might unncecessarily affect the reward filters' internal
# state).
return self.pre_network_filter.filter(dummy_env_response,
update_internal_state=update_filter_internal_state)[0].next_state
def get_state_embedding(self, state: dict) -> np.ndarray:
"""
Given a state, get the corresponding state embedding from the main network
:param state: a state dict
:return: a numpy embedding vector
"""
# TODO: this won't work anymore
# TODO: instead of the state embedding (which contains the goal) we should use the observation embedding
embedding = self.networks['main'].online_network.predict(
self.prepare_batch_for_inference(state, "main"),
outputs=self.networks['main'].online_network.state_embedding)
return embedding
def update_transition_before_adding_to_replay_buffer(self, transition: Transition) -> Transition:
"""
Allows agents to update the transition just before adding it to the replay buffer.
Can be useful for agents that want to tweak the reward, termination signal, etc.
:param transition: the transition to update
:return: the updated transition
"""
return transition
def observe(self, env_response: EnvResponse) -> bool:
"""
Given a response from the environment, distill the observation from it and store it for later use.
The response should be a dictionary containing the performed action, the new observation and measurements,
the reward, a game over flag and any additional information necessary.
:param env_response: result of call from environment.step(action)
:return: a boolean value which determines if the agent has decided to terminate the episode after seeing the
given observation
"""
# filter the env_response
filtered_env_response = self.input_filter.filter(env_response)[0]
# inject agent collected statistics, if required
if self.ap.algorithm.use_accumulated_reward_as_measurement:
if 'measurements' in filtered_env_response.next_state:
filtered_env_response.next_state['measurements'] = np.append(filtered_env_response.next_state['measurements'],
self.total_shaped_reward_in_current_episode)
else:
filtered_env_response.next_state['measurements'] = np.array([self.total_shaped_reward_in_current_episode])
# if we are in the first step in the episode, then we don't have a a next state and a reward and thus no
# transition yet, and therefore we don't need to store anything in the memory.
# also we did not reach the goal yet.
if self.current_episode_steps_counter == 0:
# initialize the current state
self.curr_state = filtered_env_response.next_state
return env_response.game_over
else:
transition = Transition(state=copy.copy(self.curr_state), action=self.last_action_info.action,
reward=filtered_env_response.reward, next_state=filtered_env_response.next_state,
game_over=filtered_env_response.game_over, info=filtered_env_response.info)
# now that we have formed a basic transition - the next state progresses to be the current state
self.curr_state = filtered_env_response.next_state
# make agent specific changes to the transition if needed
transition = self.update_transition_before_adding_to_replay_buffer(transition)
# add action info to transition
if type(self.parent).__name__ == 'CompositeAgent':
transition.add_info(self.parent.last_action_info.__dict__)
else:
transition.add_info(self.last_action_info.__dict__)
self.total_reward_in_current_episode += env_response.reward
self.reward.add_sample(env_response.reward)
return self.observe_transition(transition)
def observe_transition(self, transition):
# sum up the total shaped reward
self.total_shaped_reward_in_current_episode += transition.reward
self.shaped_reward.add_sample(transition.reward)
# create and store the transition
if self.phase in [RunPhase.TRAIN, RunPhase.HEATUP]:
# for episodic memories we keep the transitions in a local buffer until the episode is ended.
# for regular memories we insert the transitions directly to the memory
self.current_episode_buffer.insert(transition)
if not isinstance(self.memory, EpisodicExperienceReplay) \
and not self.ap.algorithm.store_transitions_only_when_episodes_are_terminated:
self.call_memory('store', transition)
if self.ap.visualization.dump_in_episode_signals:
self.update_step_in_episode_log()
return transition.game_over
def post_training_commands(self) -> None:
"""
A function which allows adding any functionality that is required to run right after the training phase ends.
:return: None
"""
pass
def get_predictions(self, states: List[Dict[str, np.ndarray]], prediction_type: PredictionType):
"""
Get a prediction from the agent with regard to the requested prediction_type.
If the agent cannot predict this type of prediction_type, or if there is more than possible way to do so,
raise a ValueException.
:param states: The states to get a prediction for
:param prediction_type: The type of prediction to get for the states. For example, the state-value prediction.
:return: the predicted values
"""
predictions = self.networks['main'].online_network.predict_with_prediction_type(
# states=self.dict_state_to_batches_dict(states, 'main'), prediction_type=prediction_type)
states=states, prediction_type=prediction_type)
if len(predictions.keys()) != 1:
raise ValueError("The network has more than one component {} matching the requested prediction_type {}. ".
format(list(predictions.keys()), prediction_type))
return list(predictions.values())[0]
def set_incoming_directive(self, action: ActionType) -> None:
"""
Allows setting a directive for the agent to follow. This is useful in hierarchy structures, where the agent
has another master agent that is controlling it. In such cases, the master agent can define the goals for the
slave agent, define its observation, possible actions, etc. The directive type is defined by the agent
in-action-space.
:param action: The action that should be set as the directive
:return:
"""
if isinstance(self.in_action_space, GoalsSpace):
self.current_hrl_goal = action
elif isinstance(self.in_action_space, AttentionActionSpace):
self.input_filter.observation_filters['attention'].crop_low = action[0]
self.input_filter.observation_filters['attention'].crop_high = action[1]
self.output_filter.action_filters['masking'].set_masking(action[0], action[1])
def save_checkpoint(self, checkpoint_prefix: str) -> None:
"""
Allows agents to store additional information when saving checkpoints.
:param checkpoint_prefix: The prefix of the checkpoint file to save
:return: None
"""
checkpoint_dir = self.ap.task_parameters.checkpoint_save_dir
checkpoint_prefix = '.'.join([checkpoint_prefix] + self.full_name_id.split('/')) # adds both level name and agent name
self.input_filter.save_state_to_checkpoint(checkpoint_dir, checkpoint_prefix)
self.output_filter.save_state_to_checkpoint(checkpoint_dir, checkpoint_prefix)
self.pre_network_filter.save_state_to_checkpoint(checkpoint_dir, checkpoint_prefix)
def restore_checkpoint(self, checkpoint_dir: str) -> None:
"""
Allows agents to store additional information when saving checkpoints.
:param checkpoint_dir: The checkpoint dir to restore from
:return: None
"""
checkpoint_prefix = '.'.join(self.full_name_id.split('/')) # adds both level name and agent name
self.input_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix)
self.pre_network_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix)
# no output filters currently have an internal state to restore
# self.output_filter.restore_state_from_checkpoint(checkpoint_dir)
def sync(self) -> None:
"""
Sync the global network parameters to local networks
:return: None
"""
for network in self.networks.values():
network.sync()
def get_success_rate(self) -> float:
return self.num_successes_across_evaluation_episodes / self.num_evaluation_episodes_completed
def collect_savers(self, parent_path_suffix: str) -> SaverCollection:
"""
Collect all of agent's network savers
:param parent_path_suffix: path suffix of the parent of the agent
(could be name of level manager or composite agent)
:return: collection of all agent savers
"""
parent_path_suffix = "{}.{}".format(parent_path_suffix, self.name)
savers = SaverCollection()
for network in self.networks.values():
savers.update(network.collect_savers(parent_path_suffix))
return savers
def get_current_time(self):
pass
return {
TimeTypes.EpisodeNumber: self.current_episode,
TimeTypes.TrainingIteration: self.training_iteration,
TimeTypes.EnvironmentSteps: self.total_steps_counter,
TimeTypes.WallClockTime: self.agent_logger.get_current_wall_clock_time(),
TimeTypes.Epoch: self.training_epoch}[self.parent_level_manager.parent_graph_manager.time_metric]
def freeze_memory(self):
"""
Shuffle episodes in the memory and freeze it to make sure that no extra data is being pushed anymore.
:return: None
"""
self.call_memory('shuffle_episodes')
self.call_memory('freeze')