diff --git a/rl_coach/memories/non_episodic/distributed_experience_replay.py b/rl_coach/memories/non_episodic/distributed_experience_replay.py index 6b14e81..61b24b0 100644 --- a/rl_coach/memories/non_episodic/distributed_experience_replay.py +++ b/rl_coach/memories/non_episodic/distributed_experience_replay.py @@ -68,7 +68,10 @@ class DistributedExperienceReplay(Memory): """ Get the number of transitions in the ER """ - return self.redis_connection.info(section='keyspace')['db{}'.format(self.db)]['keys'] + try: + return self.redis_connection.info(section='keyspace')['db{}'.format(self.db)]['keys'] + except Exception as e: + return 0 def sample(self, size: int) -> List[Transition]: """ diff --git a/rl_coach/orchestrators/kubernetes_orchestrator.py b/rl_coach/orchestrators/kubernetes_orchestrator.py index 6bdcd8c..4bb8702 100644 --- a/rl_coach/orchestrators/kubernetes_orchestrator.py +++ b/rl_coach/orchestrators/kubernetes_orchestrator.py @@ -1,13 +1,15 @@ +import uuid from rl_coach.orchestrators.deploy import Deploy, DeployParameters from kubernetes import client, config class KubernetesParameters(DeployParameters): - def __init__(self, image: str, command: list(), arguments: list() = list(), synchronized: bool = False, + def __init__(self, name: str, image: str, command: list(), arguments: list() = list(), synchronized: bool = False, num_workers: int = 1, kubeconfig: str = None, namespace: str = None, redis_ip: str = None, - redis_port: int = None, redis_db: int = 0): + redis_port: int = None, redis_db: int = 0, nfs_server: str = None, nfs_path: str = None, + checkpoint_dir: str = '/checkpoint'): self.image = image self.synchronized = synchronized self.command = command @@ -18,6 +20,10 @@ class KubernetesParameters(DeployParameters): self.redis_ip = redis_ip self.redis_port = redis_port self.redis_db = redis_db + self.nfs_server = nfs_server + self.nfs_path = nfs_path + self.checkpoint_dir = checkpoint_dir + self.name = name class Kubernetes(Deploy): @@ -25,8 +31,6 @@ class Kubernetes(Deploy): def __init__(self, deploy_parameters: KubernetesParameters): super().__init__(deploy_parameters) self.deploy_parameters = deploy_parameters - - def setup(self) -> bool: if self.deploy_parameters.kubeconfig: config.load_kube_config() else: @@ -35,6 +39,9 @@ class Kubernetes(Deploy): if not self.deploy_parameters.namespace: _, current_context = config.list_kube_config_contexts() self.deploy_parameters.namespace = current_context['context']['namespace'] + self.nfs_pvc_name = 'nfs-checkpoint-pvc' + + def setup(self) -> bool: if not self.deploy_parameters.redis_ip: # Need to spin up a redis service and a deployment. @@ -42,10 +49,61 @@ class Kubernetes(Deploy): print("Failed to setup redis") return False - self.deploy_parameters.command += ['--redis_ip', self.deploy_parameters.redis_ip, '--redis_port', '{}'.format(self.deploy_parameters.redis_port)] + if not self.create_nfs_resources(): + return False return True + def create_nfs_resources(self): + persistent_volume = client.V1PersistentVolume( + api_version="v1", + kind="PersistentVolume", + metadata=client.V1ObjectMeta( + name='nfs-checkpoint-pv', + labels={'app': 'nfs-checkpoint-pv'} + ), + spec=client.V1PersistentVolumeSpec( + access_modes=["ReadWriteMany"], + nfs=client.V1NFSVolumeSource( + path=self.deploy_parameters.nfs_path, + server=self.deploy_parameters.nfs_server + ), + capacity={'storage': '10Gi'}, + storage_class_name="" + ) + ) + api_client = client.CoreV1Api() + try: + api_client.create_persistent_volume(persistent_volume) + except client.rest.ApiException as e: + print("Got exception: %s\n while creating the NFS PV", e) + return False + + persistent_volume_claim = client.V1PersistentVolumeClaim( + api_version="v1", + kind="PersistentVolumeClaim", + metadata=client.V1ObjectMeta( + name="nfs-checkpoint-pvc" + ), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteMany"], + resources=client.V1ResourceRequirements( + requests={'storage': '10Gi'} + ), + selector=client.V1LabelSelector( + match_labels={'app': 'nfs-checkpoint-pv'} + ), + storage_class_name="" + ) + ) + + try: + api_client.create_namespaced_persistent_volume_claim(self.deploy_parameters.namespace, persistent_volume_claim) + except client.rest.ApiException as e: + print("Got exception: %s\n while creating the NFS PVC", e) + return False + return True + def deploy_redis(self) -> bool: container = client.V1Container( name="redis-server", @@ -107,37 +165,52 @@ class Kubernetes(Deploy): return False def deploy(self) -> bool: + + self.deploy_parameters.command += ['--redis_ip', self.deploy_parameters.redis_ip, '--redis_port', '{}'.format(self.deploy_parameters.redis_port)] + if self.deploy_parameters.synchronized: - return self.create_k8s_job() - else: return self.create_k8s_deployment() + else: + return self.create_k8s_job() def create_k8s_deployment(self) -> bool: + name = "{}-{}".format(self.deploy_parameters.name, uuid.uuid4()) + container = client.V1Container( - name="worker", + name=name, image=self.deploy_parameters.image, command=self.deploy_parameters.command, args=self.deploy_parameters.arguments, - image_pull_policy='Always' + image_pull_policy='Always', + volume_mounts=[client.V1VolumeMount( + name='nfs-pvc', + mount_path=self.deploy_parameters.checkpoint_dir + )] ) template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={'app': 'worker'}), + metadata=client.V1ObjectMeta(labels={'app': name}), spec=client.V1PodSpec( - containers=[container] - ) + containers=[container], + volumes=[client.V1Volume( + name="nfs-pvc", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=self.nfs_pvc_name + ) + )] + ), ) deployment_spec = client.V1DeploymentSpec( replicas=self.deploy_parameters.num_workers, template=template, selector=client.V1LabelSelector( - match_labels={'app': 'worker'} + match_labels={'app': name} ) ) deployment = client.V1Deployment( api_version='apps/v1', kind='Deployment', - metadata=client.V1ObjectMeta(name='rollout-worker'), + metadata=client.V1ObjectMeta(name=name), spec=deployment_spec ) @@ -150,4 +223,50 @@ class Kubernetes(Deploy): return False def create_k8s_job(self): - pass + name = "{}-{}".format(self.deploy_parameters.name, uuid.uuid4()) + + container = client.V1Container( + name=name, + image=self.deploy_parameters.image, + command=self.deploy_parameters.command, + args=self.deploy_parameters.arguments, + image_pull_policy='Always', + volume_mounts=[client.V1VolumeMount( + name='nfs-pvc', + mount_path=self.deploy_parameters.checkpoint_dir + )] + ) + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={'app': name}), + spec=client.V1PodSpec( + containers=[container], + volumes=[client.V1Volume( + name="nfs-pvc", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=self.nfs_pvc_name + ) + )], + restart_policy='Never' + ), + ) + + job_spec = client.V1JobSpec( + parallelism=self.deploy_parameters.num_workers, + template=template, + completions=2147483647 + ) + + job = client.V1Job( + api_version='batch/v1', + kind='Job', + metadata=client.V1ObjectMeta(name=name), + spec=job_spec + ) + + api_client = client.BatchV1Api() + try: + api_client.create_namespaced_job(self.deploy_parameters.namespace, job) + return True + except client.rest.ApiException as e: + print("Got exception: %s\n while creating deployment", e) + return False diff --git a/rl_coach/orchestrators/start_training.py b/rl_coach/orchestrators/start_training.py index 9950b61..ba28a25 100644 --- a/rl_coach/orchestrators/start_training.py +++ b/rl_coach/orchestrators/start_training.py @@ -3,30 +3,48 @@ import argparse from rl_coach.orchestrators.kubernetes_orchestrator import KubernetesParameters, Kubernetes -def main(preset, image='ajaysudh/testing:coach', redis_ip='redis-service.ajay.svc'): +def main(preset: str, image: str='ajaysudh/testing:coach', redis_ip: str=None, redis_port:int=None, num_workers: int=1, nfs_server: str="", nfs_path: str=""): rollout_command = ['python3', 'rl_coach/rollout_worker.py', '-p', preset] training_command = ['python3', 'rl_coach/training_worker.py', '-p', preset] - rollout_params = KubernetesParameters(image, rollout_command, redis_ip=redis_ip, redis_port=6379, num_workers=1) - training_params = KubernetesParameters(image, training_command, redis_ip=redis_ip, redis_port=6379, num_workers=1) + """ + TODO: + 1. Create a NFS backed PV for checkpointing. + a. Include that in both (worker, trainer) containers. + b. Change checkpoint writing logic to always write to a temporary file and then rename. + 2. Test e2e 1 loop. + a. Trainer writes a checkpoint + b. Rollout worker picks it and gathers experience, writes back to redis. + c. 1 rollout worker, 1 trainer. + 3. Trainer should be a job (not a deployment) + a. When all the epochs of training are done, workers should also be deleted. + 4. Test e2e with multiple rollout workers. + 5. Test e2e with multiple rollout workers and multiple loops. + """ + training_params = KubernetesParameters("train", image, training_command, kubeconfig='~/.kube/config', redis_ip=redis_ip, redis_port=redis_port, + nfs_server=nfs_server, nfs_path=nfs_path) training_obj = Kubernetes(training_params) if not training_obj.setup(): print("Could not setup") + return - rollout_obj = Kubernetes(training_params) - if not rollout_obj.setup(): - print("Could not setup") + rollout_params = KubernetesParameters("worker", image, rollout_command, kubeconfig='~/.kube/config', redis_ip=training_params.redis_ip, redis_port=training_params.redis_port, num_workers=num_workers) + rollout_obj = Kubernetes(rollout_params) + # if not rollout_obj.setup(): + # print("Could not setup") if training_obj.deploy(): print("Successfully deployed") else: print("Could not deploy") + return if rollout_obj.deploy(): print("Successfully deployed") else: print("Could not deploy") + return if __name__ == '__main__': @@ -39,10 +57,19 @@ if __name__ == '__main__': help="(string) Name of a preset to run (class name from the 'presets' directory.)", type=str, required=True) + parser.add_argument('-ns', '--nfs-server', + help="(string) Addresss of the nfs server.)", + type=str, + required=True) + parser.add_argument('-np', '--nfs-path', + help="(string) Exported path for the nfs server", + type=str, + required=True) + # parser.add_argument('--checkpoint_dir', # help='(string) Path to a folder containing a checkpoint to write the model to.', # type=str, # default='/checkpoint') args = parser.parse_args() - main(preset=args.preset, image=args.image) + main(preset=args.preset, image=args.image, nfs_server=args.nfs_server, nfs_path=args.nfs_path) diff --git a/rl_coach/training_worker.py b/rl_coach/training_worker.py index 1b9ddea..507ae50 100644 --- a/rl_coach/training_worker.py +++ b/rl_coach/training_worker.py @@ -1,22 +1,19 @@ """ """ import argparse +import time from rl_coach.base_parameters import TaskParameters from rl_coach.coach import expand_preset from rl_coach import core_types from rl_coach.utils import short_dynamic_import +from rl_coach.memories.non_episodic.distributed_experience_replay import DistributedExperienceReplay +from rl_coach.memories.memory import MemoryGranularity # Q: specify alternative distributed memory, or should this go in the preset? # A: preset must define distributed memory to be used. we aren't going to take a non-distributed preset and automatically distribute it. -def heatup(graph_manager): - num_steps = graph_manager.schedule_params.heatup_steps.num_steps - while len(graph_manager.agent_params.memory) < num_steps: - time.sleep(1) - - def training_worker(graph_manager, checkpoint_dir): """ restore a checkpoint then perform rollouts using the restored model @@ -29,10 +26,20 @@ def training_worker(graph_manager, checkpoint_dir): # save randomly initialized graph graph_manager.save_checkpoint() - heatup(graph_manager) + memory = DistributedExperienceReplay(max_size=(MemoryGranularity.Transitions, 1000000), + redis_ip=graph_manager.agent_params.memory.redis_ip, + redis_port=graph_manager.agent_params.memory.redis_port) + + while(memory.num_transitions() < 100): + time.sleep(10) + # TODO: critical: wait for minimum number of rollouts in memory before training + # TODO: Q: training steps passed into graph_manager.train ignored? + # TODO: specify training steps between checkpoints (in preset?) + # TODO: replace while true with what? number of steps, convergence, time, ... + # TODO: low: move evaluate out of this process # training loop - for _ in range(10): + for _ in range(40): graph_manager.phase = core_types.RunPhase.TRAIN graph_manager.train(core_types.TrainingSteps(1)) graph_manager.phase = core_types.RunPhase.UNDEFINED @@ -72,5 +79,6 @@ def main(): checkpoint_dir=args.checkpoint_dir, ) + if __name__ == '__main__': main()