diff --git a/agents/actor_critic_agent.py b/agents/actor_critic_agent.py index 9a93fef..8b198bf 100644 --- a/agents/actor_critic_agent.py +++ b/agents/actor_critic_agent.py @@ -20,6 +20,17 @@ from utils import * import scipy.signal +def last_sample(state): + """ + given a batch of states, return the last sample of the batch with length 1 + batch axis. + """ + return { + k: np.expand_dims(v[-1], 0) + for k, v in state.items() + } + + # Actor Critic - https://arxiv.org/abs/1602.01783 class ActorCriticAgent(PolicyOptimizationAgent): def __init__(self, env, tuning_parameters, replicated_device=None, thread_id=0, create_target_network = False): @@ -76,7 +87,7 @@ class ActorCriticAgent(PolicyOptimizationAgent): if game_overs[-1]: R = 0 else: - R = self.main_network.online_network.predict(np.expand_dims(next_states[-1], 0))[0] + R = self.main_network.online_network.predict(last_sample(next_states))[0] for i in reversed(range(num_transitions)): R = rewards[i] + self.tp.agent.discount * R @@ -85,7 +96,7 @@ class ActorCriticAgent(PolicyOptimizationAgent): elif self.policy_gradient_rescaler == PolicyGradientRescaler.GAE: # get bootstraps - bootstrapped_value = self.main_network.online_network.predict(np.expand_dims(next_states[-1], 0))[0] + bootstrapped_value = self.main_network.online_network.predict(last_sample(next_states))[0] values = np.append(current_state_values, bootstrapped_value) if game_overs[-1]: values[-1] = 0 @@ -101,7 +112,9 @@ class ActorCriticAgent(PolicyOptimizationAgent): actions = np.expand_dims(actions, -1) # train - result = self.main_network.online_network.accumulate_gradients([current_states, actions], + inputs = copy.copy(current_states) + inputs['output_1_0'] = actions + result = self.main_network.online_network.accumulate_gradients(inputs, [state_value_head_targets, action_advantages]) # logging @@ -114,11 +127,17 @@ class ActorCriticAgent(PolicyOptimizationAgent): return total_loss def choose_action(self, curr_state, phase=RunPhase.TRAIN): + # TODO: rename curr_state -> state + # convert to batch so we can run it through the network - observation = np.expand_dims(np.array(curr_state['observation']), 0) + curr_state = { + k: np.expand_dims(np.array(curr_state[k]), 0) + for k in curr_state.keys() + } + if self.env.discrete_controls: # DISCRETE - state_value, action_probabilities = self.main_network.online_network.predict(observation) + state_value, action_probabilities = self.main_network.online_network.predict(curr_state) action_probabilities = action_probabilities.squeeze() if phase == RunPhase.TRAIN: action = self.exploration_policy.get_action(action_probabilities) @@ -128,7 +147,7 @@ class ActorCriticAgent(PolicyOptimizationAgent): self.entropy.add_sample(-np.sum(action_probabilities * np.log(action_probabilities + eps))) else: # CONTINUOUS - state_value, action_values_mean, action_values_std = self.main_network.online_network.predict(observation) + state_value, action_values_mean, action_values_std = self.main_network.online_network.predict(curr_state) action_values_mean = action_values_mean.squeeze() action_values_std = action_values_std.squeeze() if phase == RunPhase.TRAIN: diff --git a/agents/agent.py b/agents/agent.py index 274649e..df2f21c 100644 --- a/agents/agent.py +++ b/agents/agent.py @@ -93,7 +93,7 @@ class Agent(object): self.running_reward = None self.training_iteration = 0 self.current_episode = self.tp.current_episode = 0 - self.curr_state = [] + self.curr_state = {} self.current_episode_steps_counter = 0 self.episode_running_info = {} self.last_episode_evaluation_ran = 0 @@ -194,7 +194,7 @@ class Agent(object): for signal in self.signals: signal.reset() self.total_reward_in_current_episode = 0 - self.curr_state = [] + self.curr_state = {} self.last_episode_images = [] self.current_episode_steps_counter = 0 self.episode_running_info = {} @@ -289,23 +289,20 @@ class Agent(object): :param batch: An array of transitions :return: For each transition element, returns a numpy array of all the transitions in the batch """ + current_states = {} + next_states = {} - current_observations = np.array([transition.state['observation'] for transition in batch]) - next_observations = np.array([transition.next_state['observation'] for transition in batch]) + current_states['observation'] = np.array([transition.state['observation'] for transition in batch]) + next_states['observation'] = np.array([transition.next_state['observation'] for transition in batch]) actions = np.array([transition.action for transition in batch]) rewards = np.array([transition.reward for transition in batch]) game_overs = np.array([transition.game_over for transition in batch]) total_return = np.array([transition.total_return for transition in batch]) - current_states = current_observations - next_states = next_observations - # get the entire state including measurements if available if self.tp.agent.use_measurements: - current_measurements = np.array([transition.state['measurements'] for transition in batch]) - next_measurements = np.array([transition.next_state['measurements'] for transition in batch]) - current_states = [current_observations, current_measurements] - next_states = [next_observations, next_measurements] + current_states['measurements'] = np.array([transition.state['measurements'] for transition in batch]) + next_states['measurements'] = np.array([transition.next_state['measurements'] for transition in batch]) return current_states, next_states, actions, rewards, game_overs, total_return @@ -353,12 +350,24 @@ class Agent(object): # get new action action_info = {"action_probability": 1.0 / self.env.action_space_size, "action_value": 0} - is_first_transition_in_episode = (self.curr_state == []) + is_first_transition_in_episode = (self.curr_state == {}) if is_first_transition_in_episode: - observation = self.preprocess_observation(self.env.observation) - observation = stack_observation([], observation, self.tp.env.observation_stack_size) + if not isinstance(self.env.state, dict): + raise ValueError(( + 'expected state to be a dictionary, found {}' + ).format(type(self.env.state))) - self.curr_state = {'observation': observation} + state = self.env.state + # TODO: modify preprocess_observation to modify the entire state + # for now, only preprocess the observation + state['observation'] = self.preprocess_observation(state['observation']) + + # TODO: provide option to stack more than just the observation + # TODO: this should probably be happening in an environment wrapper anyway + state['observation'] = stack_observation([], state['observation'], self.tp.env.observation_stack_size) + + self.curr_state = state + # TODO: this should be handled in the environment if self.tp.agent.use_measurements: self.curr_state['measurements'] = self.env.measurements if self.tp.agent.use_accumulated_reward_as_measurement: @@ -373,22 +382,25 @@ class Agent(object): if type(action) == np.ndarray: action = action.squeeze() result = self.env.step(action) + shaped_reward = self.preprocess_reward(result['reward']) if 'action_intrinsic_reward' in action_info.keys(): shaped_reward += action_info['action_intrinsic_reward'] + # TODO: should total_reward_in_current_episode include shaped_reward? self.total_reward_in_current_episode += result['reward'] - observation = self.preprocess_observation(result['observation']) + next_state = result['state'] + next_state['observation'] = self.preprocess_observation(next_state['observation']) # plot action values online if self.tp.visualization.plot_action_values_online and phase != RunPhase.HEATUP: self.plot_action_values_online() # initialize the next state - observation = stack_observation(self.curr_state['observation'], observation, self.tp.env.observation_stack_size) + # TODO: provide option to stack more than just the observation + next_state['observation'] = stack_observation(self.curr_state['observation'], next_state['observation'], self.tp.env.observation_stack_size) - next_state = {'observation': observation} if self.tp.agent.use_measurements and 'measurements' in result.keys(): - next_state['measurements'] = result['measurements'] + next_state['measurements'] = result['state']['measurements'] if self.tp.agent.use_accumulated_reward_as_measurement: next_state['measurements'] = np.append(next_state['measurements'], self.total_reward_in_current_episode) diff --git a/agents/dfp_agent.py b/agents/dfp_agent.py index 2205aa6..c055d2c 100644 --- a/agents/dfp_agent.py +++ b/agents/dfp_agent.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/architectures/tensorflow_components/architecture.py b/architectures/tensorflow_components/architecture.py index 6a36bb8..689aa52 100644 --- a/architectures/tensorflow_components/architecture.py +++ b/architectures/tensorflow_components/architecture.py @@ -48,7 +48,7 @@ class TensorFlowArchitecture(Architecture): self.network_is_local = network_is_local assert tuning_parameters.agent.tensorflow_support, 'TensorFlow is not supported for this agent' self.sess = tuning_parameters.sess - self.inputs = [] + self.inputs = {} self.outputs = [] self.targets = [] self.losses = [] @@ -106,7 +106,8 @@ class TensorFlowArchitecture(Architecture): # gradients of the outputs w.r.t. the inputs # at the moment, this is only used by ddpg if len(self.outputs) == 1: - self.gradients_wrt_inputs = [tf.gradients(self.outputs[0], input_ph) for input_ph in self.inputs] + # TODO: convert gradients_with_respect_to_inputs into dictionary? + self.gradients_wrt_inputs = [tf.gradients(self.outputs[0], input_ph) for input_ph in self.inputs.values()] self.gradients_weights_ph = tf.placeholder('float32', self.outputs[0].shape, 'output_gradient_weights') self.weighted_gradients = tf.gradients(self.outputs[0], self.trainable_weights, self.gradients_weights_ph) @@ -169,9 +170,8 @@ class TensorFlowArchitecture(Architecture): # feed inputs if additional_fetches is None: additional_fetches = [] - inputs = force_list(inputs) - feed_dict = dict(zip(self.inputs, inputs)) + feed_dict = self._feed_dict(inputs) # feed targets targets = force_list(targets) @@ -266,6 +266,12 @@ class TensorFlowArchitecture(Architecture): while self.tp.sess.run(self.release_counter) % self.tp.num_threads != 0: time.sleep(0.00001) + def _feed_dict(self, inputs): + return { + self.inputs[input_name]: input_value + for input_name, input_value in inputs.items() + } + def predict(self, inputs, outputs=None): """ Run a forward pass of the network using the given input @@ -275,8 +281,8 @@ class TensorFlowArchitecture(Architecture): WARNING: must only call once per state since each call is assumed by LSTM to be a new time step. """ - - feed_dict = dict(zip(self.inputs, force_list(inputs))) + # TODO: rename self.inputs -> self.input_placeholders + feed_dict = self._feed_dict(inputs) if outputs is None: outputs = self.outputs @@ -290,21 +296,21 @@ class TensorFlowArchitecture(Architecture): return squeeze_list(output) - def train_on_batch(self, inputs, targets, scaler=1., additional_fetches=None): - """ - Given a batch of examples and targets, runs a forward pass & backward pass and then applies the gradients - :param additional_fetches: Optional tensors to fetch during the training process - :param inputs: The input for the network - :param targets: The targets corresponding to the input batch - :param scaler: A scaling factor that allows rescaling the gradients before applying them - :return: The loss of the network - """ - if additional_fetches is None: - additional_fetches = [] - force_list(additional_fetches) - loss = self.accumulate_gradients(inputs, targets, additional_fetches=additional_fetches) - self.apply_and_reset_gradients(self.accumulated_gradients, scaler) - return loss + # def train_on_batch(self, inputs, targets, scaler=1., additional_fetches=None): + # """ + # Given a batch of examples and targets, runs a forward pass & backward pass and then applies the gradients + # :param additional_fetches: Optional tensors to fetch during the training process + # :param inputs: The input for the network + # :param targets: The targets corresponding to the input batch + # :param scaler: A scaling factor that allows rescaling the gradients before applying them + # :return: The loss of the network + # """ + # if additional_fetches is None: + # additional_fetches = [] + # force_list(additional_fetches) + # loss = self.accumulate_gradients(inputs, targets, additional_fetches=additional_fetches) + # self.apply_and_reset_gradients(self.accumulated_gradients, scaler) + # return loss def get_weights(self): """ diff --git a/architectures/tensorflow_components/general_network.py b/architectures/tensorflow_components/general_network.py index e998dd7..e8befc2 100644 --- a/architectures/tensorflow_components/general_network.py +++ b/architectures/tensorflow_components/general_network.py @@ -112,7 +112,7 @@ class GeneralTensorFlowNetwork(TensorFlowArchitecture): #################### state_embedding = [] - for idx, input_type in enumerate(self.tp.agent.input_types): + for input_name, input_type in self.tp.agent.input_types.items(): # get the class of the input embedder input_embedder = self.get_input_embedder(input_type) self.input_embedders.append(input_embedder) @@ -122,9 +122,9 @@ class GeneralTensorFlowNetwork(TensorFlowArchitecture): # the existing input_placeholders into the input_embedders. if network_idx == 0: input_placeholder, embedding = input_embedder() - self.inputs.append(input_placeholder) + self.inputs[input_name] = input_placeholder else: - input_placeholder, embedding = input_embedder(self.inputs[idx]) + input_placeholder, embedding = input_embedder(self.inputs[input_name]) state_embedding.append(embedding) @@ -159,13 +159,15 @@ class GeneralTensorFlowNetwork(TensorFlowArchitecture): # build the head if self.network_is_local: - output, target_placeholder, input_placeholder = self.output_heads[-1](head_input) + output, target_placeholder, input_placeholders = self.output_heads[-1](head_input) self.targets.extend(target_placeholder) else: - output, input_placeholder = self.output_heads[-1](head_input) + output, input_placeholders = self.output_heads[-1](head_input) self.outputs.extend(output) - self.inputs.extend(input_placeholder) + # TODO: use head names as well + for placeholder_index, input_placeholder in enumerate(input_placeholders): + self.inputs['output_{}_{}'.format(head_idx, placeholder_index)] = input_placeholder # Losses self.losses = tf.losses.get_losses(self.name) diff --git a/architectures/tensorflow_components/heads.py b/architectures/tensorflow_components/heads.py index 2653d59..de6d2f8 100644 --- a/architectures/tensorflow_components/heads.py +++ b/architectures/tensorflow_components/heads.py @@ -250,7 +250,7 @@ class MeasurementsPredictionHead(Head): name='output') action_stream = tf.reshape(action_stream, (tf.shape(action_stream)[0], self.num_actions, self.multi_step_measurements_size)) - action_stream = action_stream - tf.reduce_mean(action_stream, reduction_indices=1, keep_dims=True) + action_stream = action_stream - tf.reduce_mean(action_stream, reduction_indices=1, keepdims=True) # merge to future measurements predictions self.output = tf.add(expectation_stream, action_stream, name='output') @@ -302,7 +302,7 @@ class DNDQHead(Head): square_diff = tf.square(dnd_embeddings - tf.expand_dims(input_layer, 1)) distances = tf.reduce_sum(square_diff, axis=2) + [self.l2_norm_added_delta] weights = 1.0 / distances - normalised_weights = weights / tf.reduce_sum(weights, axis=1, keep_dims=True) + normalised_weights = weights / tf.reduce_sum(weights, axis=1, keepdims=True) return tf.reduce_sum(dnd_values * normalised_weights, axis=1) diff --git a/coach.py b/coach.py index d3c7694..c1ef4aa 100644 --- a/coach.py +++ b/coach.py @@ -30,6 +30,14 @@ from subprocess import Popen import datetime import presets import atexit +import sys +import subprocess +from threading import Thread + +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty # for Python 3.x if len(set(failed_imports)) > 0: screen.warning("Warning: failed to import the following packages - {}".format(', '.join(set(failed_imports)))) @@ -152,6 +160,38 @@ def run_dict_to_json(_run_dict, task_id=''): return json_path +def enqueue_output(out, queue): + for line in iter(out.readline, b''): + queue.put(line) + out.close() + + +def merge_streams(processes, output_stream=sys.stdout): + q = Queue() + threads = [] + for p in processes: + threads.append(Thread(target=enqueue_output, args=(p.stdout, q))) + threads.append(Thread(target=enqueue_output, args=(p.stderr, q))) + + for t in threads: + t.daemon = True + t.start() + + while True: + try: + line = q.get_nowait() + except Empty: + # break when all processes are done and q is empty + if all(p.poll() is not None for p in processes): + break + else: + # sys.stdout.write(line) + output_stream.write(line.decode(output_stream.encoding)) + output_stream.flush() + + print('All processes done') + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-p', '--preset', @@ -252,6 +292,8 @@ if __name__ == "__main__": if not args.no_summary: atexit.register(logger.print_summary) + set_cpu() + # Single-threaded runs if run_dict['num_threads'] == 1: # set tuning parameters @@ -285,11 +327,13 @@ if __name__ == "__main__": set_cpu() # create a parameter server - Popen(["python3", - "./parallel_actor.py", - "--ps_hosts={}".format(ps_hosts), - "--worker_hosts={}".format(worker_hosts), - "--job_name=ps"]) + parameter_server = Popen([ + "python3", + "./parallel_actor.py", + "--ps_hosts={}".format(ps_hosts), + "--worker_hosts={}".format(worker_hosts), + "--job_name=ps", + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) screen.log_title("*** Distributed Training ***") time.sleep(1) @@ -314,13 +358,15 @@ if __name__ == "__main__": "--job_name=worker", "--load_json={}".format(json_run_dict_path)] - p = Popen(workers_args) + p = Popen(workers_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) if i != run_dict['num_threads']: workers.append(p) else: evaluation_worker = p + merge_streams(workers + [parameter_server]) + # wait for all workers [w.wait() for w in workers] evaluation_worker.kill() diff --git a/configurations.py b/configurations.py index f929642..ddc5ce6 100644 --- a/configurations.py +++ b/configurations.py @@ -69,7 +69,7 @@ class Parameters(object): parameters[k] = dict(v.items()) else: parameters[k] = v - + return json.dumps(parameters, indent=4, default=repr) @@ -77,7 +77,7 @@ class AgentParameters(Parameters): agent = '' # Architecture parameters - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] middleware_type = MiddlewareTypes.FC loss_weights = [1.0] @@ -327,7 +327,7 @@ class Human(AgentParameters): class NStepQ(AgentParameters): type = 'NStepQAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] loss_weights = [1.0] optimizer_type = 'Adam' @@ -343,7 +343,7 @@ class NStepQ(AgentParameters): class DQN(AgentParameters): type = 'DQNAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] loss_weights = [1.0] optimizer_type = 'Adam' @@ -385,7 +385,7 @@ class QuantileRegressionDQN(DQN): class NEC(AgentParameters): type = 'NECAgent' optimizer_type = 'RMSProp' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.DNDQ] loss_weights = [1.0] dnd_size = 500000 @@ -399,7 +399,7 @@ class NEC(AgentParameters): class ActorCritic(AgentParameters): type = 'ActorCriticAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.V, OutputTypes.Pi] loss_weights = [0.5, 1.0] stop_gradients_from_head = [False, False] @@ -417,7 +417,7 @@ class ActorCritic(AgentParameters): class PolicyGradient(AgentParameters): type = 'PolicyGradientsAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Pi] loss_weights = [1.0] num_episodes_in_experience_replay = 2 @@ -430,7 +430,7 @@ class PolicyGradient(AgentParameters): class DDPG(AgentParameters): type = 'DDPGAgent' - input_types = [InputTypes.Observation, InputTypes.Action] + input_types = {'observation': InputTypes.Observation, 'action': InputTypes.Action} output_types = [OutputTypes.V] # V is used because we only want a single Q value loss_weights = [1.0] hidden_layers_activation_function = 'relu' @@ -443,7 +443,7 @@ class DDPG(AgentParameters): class DDDPG(AgentParameters): type = 'DDPGAgent' - input_types = [InputTypes.Observation, InputTypes.Action] + input_types = {'observation': InputTypes.Observation, 'action': InputTypes.Action} output_types = [OutputTypes.V] # V is used because we only want a single Q value loss_weights = [1.0] hidden_layers_activation_function = 'relu' @@ -456,7 +456,7 @@ class DDDPG(AgentParameters): class NAF(AgentParameters): type = 'NAFAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.NAF] loss_weights = [1.0] hidden_layers_activation_function = 'tanh' @@ -469,7 +469,7 @@ class NAF(AgentParameters): class PPO(AgentParameters): type = 'PPOAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.V] loss_weights = [1.0] hidden_layers_activation_function = 'tanh' @@ -489,7 +489,7 @@ class PPO(AgentParameters): class ClippedPPO(AgentParameters): type = 'ClippedPPOAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.V, OutputTypes.PPO] loss_weights = [0.5, 1.0] stop_gradients_from_head = [False, False] @@ -515,7 +515,11 @@ class ClippedPPO(AgentParameters): class DFP(AgentParameters): type = 'DFPAgent' - input_types = [InputTypes.Observation, InputTypes.Measurements, InputTypes.GoalVector] + input_types = { + 'observation': InputTypes.Observation, + 'measurements': InputTypes.Measurements, + 'goal': InputTypes.GoalVector + } output_types = [OutputTypes.MeasurementsPrediction] loss_weights = [1.0] use_measurements = True @@ -527,7 +531,7 @@ class DFP(AgentParameters): class MMC(AgentParameters): type = 'MixedMonteCarloAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] loss_weights = [1.0] num_steps_between_copying_online_weights_to_target = 1000 @@ -537,7 +541,7 @@ class MMC(AgentParameters): class PAL(AgentParameters): type = 'PALAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] loss_weights = [1.0] pal_alpha = 0.9 @@ -548,7 +552,7 @@ class PAL(AgentParameters): class BC(AgentParameters): type = 'BCAgent' - input_types = [InputTypes.Observation] + input_types = {'observation': InputTypes.Observation} output_types = [OutputTypes.Q] loss_weights = [1.0] collect_new_data = False diff --git a/environments/carla_environment_wrapper.py b/environments/carla_environment_wrapper.py index 115bdbf..b4657a3 100644 --- a/environments/carla_environment_wrapper.py +++ b/environments/carla_environment_wrapper.py @@ -161,7 +161,6 @@ class CarlaEnvironmentWrapper(EnvironmentWrapper): measurements = [] while type(measurements) == list: measurements, sensor_data = self.game.read_data() - self.observation = sensor_data['CameraRGB'].data self.location = (measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, @@ -181,7 +180,10 @@ class CarlaEnvironmentWrapper(EnvironmentWrapper): - np.abs(self.control.steer) * 10 # update measurements - self.measurements = [measurements.player_measurements.forward_speed] + self.observation = { + 'observation': sensor_data['CameraRGB'].data, + 'measurements': [measurements.player_measurements.forward_speed], + } self.autopilot = measurements.player_measurements.autopilot_control # action_p = ['%.2f' % member for member in [self.control.throttle, self.control.steer]] diff --git a/environments/doom_environment_wrapper.py b/environments/doom_environment_wrapper.py index c4a06ac..a0c618d 100644 --- a/environments/doom_environment_wrapper.py +++ b/environments/doom_environment_wrapper.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -135,8 +135,10 @@ class DoomEnvironmentWrapper(EnvironmentWrapper): # extract all data from the current state state = self.game.get_state() if state is not None and state.screen_buffer is not None: - self.observation = state.screen_buffer - self.measurements = state.game_variables + self.observation = { + 'observation': state.screen_buffer, + 'measurements': state.game_variables, + } self.reward = self.game.get_last_reward() self.done = self.game.is_episode_finished() @@ -157,5 +159,3 @@ class DoomEnvironmentWrapper(EnvironmentWrapper): def _restart_environment_episode(self, force_environment_reset=False): self.game.new_episode() - - diff --git a/environments/environment_wrapper.py b/environments/environment_wrapper.py index 1491c5c..48fd0d8 100644 --- a/environments/environment_wrapper.py +++ b/environments/environment_wrapper.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,14 +31,13 @@ class EnvironmentWrapper(object): # env initialization self.game = [] self.actions = {} - self.observation = [] + self.state = [] self.reward = 0 self.done = False self.default_action = 0 self.last_action_idx = 0 self.episode_idx = 0 self.last_episode_time = time.time() - self.measurements = [] self.info = [] self.action_space_low = 0 self.action_space_high = 0 @@ -65,6 +64,22 @@ class EnvironmentWrapper(object): self.game_is_open = True self.renderer = Renderer() + @property + def measurements(self): + assert False + + @measurements.setter + def measurements(self, value): + assert False + + @property + def observation(self): + assert False + + @observation.setter + def observation(self, value): + assert False + def _idx_to_action(self, action_idx): """ Convert an action index to one of the environment available actions. @@ -108,7 +123,7 @@ class EnvironmentWrapper(object): for env_keys in self.key_to_action.keys(): if set(env_keys) == set(self.renderer.pressed_keys): return self.key_to_action[env_keys] - + # return the default action 0 so that the environment will continue running return self.default_action @@ -116,7 +131,7 @@ class EnvironmentWrapper(object): """ Perform a single step on the environment using the given action :param action_idx: the action to perform on the environment - :return: A dictionary containing the observation, reward, done flag, action and measurements + :return: A dictionary containing the state, reward, done flag and action """ self.last_action_idx = action_idx @@ -127,13 +142,12 @@ class EnvironmentWrapper(object): if self.is_rendered: self.render() - self.observation = self._preprocess_observation(self.observation) + self.state = self._preprocess_state(self.state) - return {'observation': self.observation, + return {'state': self.state, 'reward': self.reward, 'done': self.done, 'action': self.last_action_idx, - 'measurements': self.measurements, 'info': self.info} def render(self): @@ -146,7 +160,7 @@ class EnvironmentWrapper(object): """ Reset the environment and all the variable of the wrapper :param force_environment_reset: forces environment reset even when the game did not end - :return: A dictionary containing the observation, reward, done flag, action and measurements + :return: A dictionary containing the state, reward, done flag and action """ self._restart_environment_episode(force_environment_reset) self.last_episode_time = time.time() @@ -156,17 +170,18 @@ class EnvironmentWrapper(object): self.last_action_idx = 0 self._update_state() - # render before the preprocessing of the observation, so that the image will be in its original quality + # render before the preprocessing of the state, so that the image will be in its original quality if self.is_rendered: self.render() - self.observation = self._preprocess_observation(self.observation) + # TODO BUG: if the environment has not been reset, _preprocessed_state will be running on an already preprocessed state + # TODO: see also _update_state above + self.state = self._preprocess_state(self.state) - return {'observation': self.observation, + return {'state': self.state, 'reward': self.reward, 'done': self.done, 'action': self.last_action_idx, - 'measurements': self.measurements, 'info': self.info} def get_random_action(self): @@ -181,7 +196,7 @@ class EnvironmentWrapper(object): def change_phase(self, phase): """ - Change the current phase of the run. + Change the current phase of the run. This is useful when different behavior is expected when testing and training :param phase: The running phase of the algorithm :type phase: RunPhase @@ -216,19 +231,19 @@ class EnvironmentWrapper(object): """ pass - def _preprocess_observation(self, observation): + def _preprocess_state(self, state): """ - Do initial observation preprocessing such as cropping, rgb2gray, rescale etc. + Do initial state preprocessing such as cropping, rgb2gray, rescale etc. Implementing this function is optional. - :param observation: a raw observation from the environment - :return: the preprocessed observation + :param state: a raw state from the environment + :return: the preprocessed state """ - return observation + return state def _update_state(self): """ Updates the state from the environment. - Should update self.observation, self.reward, self.done, self.measurements and self.info + Should update self.state, self.reward, self.done and self.info :return: None """ pass @@ -243,7 +258,8 @@ class EnvironmentWrapper(object): def get_rendered_image(self): """ Return a numpy array containing the image that will be rendered to the screen. - This can be different from the observation. For example, mujoco's observation is a measurements vector. + This can be different from the state. For example, mujoco's state is a measurements vector. :return: numpy array containing the image that will be rendered to the screen """ - return self.observation \ No newline at end of file + # TODO: probably needs revisiting + return self.state diff --git a/environments/gym_environment_wrapper.py b/environments/gym_environment_wrapper.py index a2aa290..761ccb5 100644 --- a/environments/gym_environment_wrapper.py +++ b/environments/gym_environment_wrapper.py @@ -60,7 +60,7 @@ class GymEnvironmentWrapper(EnvironmentWrapper): self.env.frameskip = self.frame_skip self.discrete_controls = type(self.env.action_space) != gym.spaces.box.Box - self.observation = self.reset(True)['observation'] + self.state = self.reset(True)['state'] # render if self.is_rendered: @@ -70,12 +70,13 @@ class GymEnvironmentWrapper(EnvironmentWrapper): scale = 2 self.renderer.create_screen(image.shape[1]*scale, image.shape[0]*scale) - self.is_state_type_image = len(self.observation.shape) > 1 + # TODO: collect and store this as observation space instead + self.is_state_type_image = len(self.state['observation'].shape) > 1 if self.is_state_type_image: - self.width = self.observation.shape[1] - self.height = self.observation.shape[0] + self.width = self.state['observation'].shape[1] + self.height = self.state['observation'].shape[0] else: - self.width = self.observation.shape[0] + self.width = self.state['observation'].shape[0] # action space self.actions_description = {} @@ -101,6 +102,12 @@ class GymEnvironmentWrapper(EnvironmentWrapper): self.timestep_limit = None self.measurements_size = len(self.step(0)['info'].keys()) + def _wrap_state(self, state): + if isinstance(self.env.observation_space, gym.spaces.Dict): + return state + else: + return {'observation': state} + def _update_state(self): if hasattr(self.env, 'env') and hasattr(self.env.env, 'ale'): if self.phase == RunPhase.TRAIN and hasattr(self, 'current_ale_lives'): @@ -131,28 +138,30 @@ class GymEnvironmentWrapper(EnvironmentWrapper): action = np.squeeze(action) action = np.clip(action, self.action_space_low, self.action_space_high) - self.observation, self.reward, self.done, self.info = self.env.step(action) + state, self.reward, self.done, self.info = self.env.step(action) + self.state = self._wrap_state(state) - def _preprocess_observation(self, observation): + def _preprocess_state(self, state): + # TODO: move this into wrapper if any(env in self.env_id for env in ["Breakout", "Pong"]): # crop image - observation = observation[34:195, :, :] - return observation + state['observation'] = state['observation'][34:195, :, :] + return state def _restart_environment_episode(self, force_environment_reset=False): # prevent reset of environment if there are ale lives left if (hasattr(self.env, 'env') and hasattr(self.env.env, 'ale') and self.env.env.ale.lives() > 0) \ and not force_environment_reset and not self.env._past_limit(): - return self.observation + return self.state if self.seed: self.env.seed(self.seed) - self.observation = self.env.reset() - while self.observation is None: + self.state = self._wrap_state(self.env.reset()) + while self.state is None: self.step(0) - return self.observation + return self.state def get_rendered_image(self): return self.env.render(mode='rgb_array') diff --git a/logger.py b/logger.py index ff05d6a..3e6a663 100644 --- a/logger.py +++ b/logger.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -250,13 +250,13 @@ class Logger(BaseLogger): if 'Training Reward' in self.data.keys() and 'Evaluation Reward' in self.data.keys(): screen.log_title("Max training reward: {}, max evaluation reward: {}".format(self.data['Training Reward'].max(), self.data['Evaluation Reward'].max())) screen.separator() - if screen.ask_yes_no("Do you want to discard the experiment results (Warning: this cannot be undone)?", False): - self.remove_experiment_dir() - elif screen.ask_yes_no("Do you want to specify a different experiment name to save to?", False): - new_name = self.get_experiment_name() - new_path = self.get_experiment_path(new_name, create_path=False) - shutil.move(self.experiments_path, new_path) - screen.log_title("Results moved to: {}".format(new_path)) + # if screen.ask_yes_no("Do you want to discard the experiment results (Warning: this cannot be undone)?", False): + # self.remove_experiment_dir() + # elif screen.ask_yes_no("Do you want to specify a different experiment name to save to?", False): + # new_name = self.get_experiment_name() + # new_path = self.get_experiment_path(new_name, create_path=False) + # shutil.move(self.experiments_path, new_path) + # screen.log_title("Results moved to: {}".format(new_path)) def get_experiment_name(self, initial_experiment_name=''): match = None diff --git a/run_test.py b/run_test.py index 7e8f6ff..696b37b 100644 --- a/run_test.py +++ b/run_test.py @@ -109,6 +109,7 @@ if __name__ == '__main__': num_workers=preset.test_num_workers, log_file_name=log_file_name, ) + print('cmd', cmd) p = subprocess.Popen(cmd, shell=True, executable="/bin/bash", preexec_fn=os.setsid) # get the csv with the results