diff --git a/rl_coach/data_stores/nfs_data_store.py b/rl_coach/data_stores/nfs_data_store.py index 6946139..b937b0e 100644 --- a/rl_coach/data_stores/nfs_data_store.py +++ b/rl_coach/data_stores/nfs_data_store.py @@ -1,3 +1,5 @@ +import uuid + from rl_coach.data_stores.data_store import DataStore, DataStoreParameters from kubernetes import client as k8sclient @@ -61,15 +63,33 @@ class NFSDataStore(DataStore): name=name, image="k8s.gcr.io/volume-nfs:0.8", ports=[k8sclient.V1ContainerPort( - name="nfs", - container_port=2049, - protocol="TCP" - )] + name="nfs", + container_port=2049, + protocol="TCP" + ), + k8sclient.V1ContainerPort( + name="rpcbind", + container_port=111 + ), + k8sclient.V1ContainerPort( + name="mountd", + container_port=20048 + ), + ], + volume_mounts=[k8sclient.V1VolumeMount( + name='nfs-host-path', + mount_path='/exports' + )], + security_context=k8sclient.V1SecurityContext(privileged=True) ) template = k8sclient.V1PodTemplateSpec( metadata=k8sclient.V1ObjectMeta(labels={'app': 'nfs-server'}), spec=k8sclient.V1PodSpec( - containers=[container] + containers=[container], + volumes=[k8sclient.V1Volume( + name="nfs-host-path", + host_path=k8sclient.V1HostPathVolumeSource(path='/tmp/nfsexports-{}'.format(uuid.uuid4())) + )] ) ) deployment_spec = k8sclient.V1DeploymentSpec( @@ -117,7 +137,7 @@ class NFSDataStore(DataStore): try: k8s_core_v1_api_client.create_namespaced_service(self.params.namespace, service) self.params.svc_name = svc_name - self.params.server = 'nfs-service.{}.svc'.format(self.params.namespace) + self.params.server = 'nfs-service.{}.svc.cluster.local'.format(self.params.namespace) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating a service for nfs-server", e) return False diff --git a/rl_coach/orchestrators/kubernetes_orchestrator.py b/rl_coach/orchestrators/kubernetes_orchestrator.py index 8f5f405..a0a8ba9 100644 --- a/rl_coach/orchestrators/kubernetes_orchestrator.py +++ b/rl_coach/orchestrators/kubernetes_orchestrator.py @@ -62,8 +62,6 @@ class Kubernetes(Deploy): _, current_context = k8sconfig.list_kube_config_contexts() self.params.namespace = current_context['context']['namespace'] - self.nfs_pvc_name = 'nfs-checkpoint-pvc' - if os.environ.get('http_proxy'): k8sclient.Configuration._default.proxy = os.environ.get('http_proxy') @@ -93,6 +91,8 @@ class Kubernetes(Deploy): self.memory_backend.deploy() if not self.data_store.deploy(): return False + if self.params.data_store_params.store_type == "nfs": + self.nfs_pvc = self.data_store.get_info() return True def deploy_trainer(self) -> bool: @@ -124,9 +124,7 @@ class Kubernetes(Deploy): containers=[container], volumes=[k8sclient.V1Volume( name="nfs-pvc", - persistent_volume_claim=k8sclient.V1PersistentVolumeClaimVolumeSource( - claim_name=self.nfs_pvc_name - ) + persistent_volume_claim=self.nfs_pvc )] ), ) @@ -179,7 +177,7 @@ class Kubernetes(Deploy): worker_params.command += ['--memory-backend-params', json.dumps(self.params.memory_backend_parameters.__dict__)] worker_params.command += ['--data-store-params', json.dumps(self.params.data_store_params.__dict__)] - worker_params.command += ['--num-workers', worker_params.num_replicas] + worker_params.command += ['--num-workers', '{}'.format(worker_params.num_replicas)] name = "{}-{}".format(worker_params.run_type, uuid.uuid4()) @@ -201,9 +199,7 @@ class Kubernetes(Deploy): containers=[container], volumes=[k8sclient.V1Volume( name="nfs-pvc", - persistent_volume_claim=k8sclient.V1PersistentVolumeClaimVolumeSource( - claim_name=self.nfs_pvc_name - ) + persistent_volume_claim=self.nfs_pvc )], ), ) diff --git a/rl_coach/orchestrators/start_training.py b/rl_coach/orchestrators/start_training.py index 6f93a7d..100a870 100644 --- a/rl_coach/orchestrators/start_training.py +++ b/rl_coach/orchestrators/start_training.py @@ -9,7 +9,7 @@ from rl_coach.data_stores.nfs_data_store import NFSDataStoreParameters def main(preset: str, image: str='ajaysudh/testing:coach', num_workers: int=1, nfs_server: str=None, nfs_path: str=None, memory_backend: str=None, data_store: str=None, s3_end_point: str=None, s3_bucket_name: str=None, - policy_type: str="OFF"): + s3_creds_file: str=None, policy_type: str="OFF"): rollout_command = ['python3', 'rl_coach/rollout_worker.py', '-p', preset, '--policy-type', policy_type] training_command = ['python3', 'rl_coach/training_worker.py', '-p', preset, '--policy-type', policy_type] @@ -21,7 +21,7 @@ def main(preset: str, image: str='ajaysudh/testing:coach', num_workers: int=1, n if data_store == "s3": ds_params = DataStoreParameters("s3", "", "") ds_params_instance = S3DataStoreParameters(ds_params=ds_params, end_point=s3_end_point, bucket_name=s3_bucket_name, - checkpoint_dir="/checkpoint") + creds_file=s3_creds_file, checkpoint_dir="/checkpoint") elif data_store == "nfs": ds_params = DataStoreParameters("nfs", "kubernetes", {"namespace": "default"}) ds_params_instance = NFSDataStoreParameters(ds_params) @@ -70,16 +70,18 @@ if __name__ == '__main__': parser.add_argument('--memory-backend', help="(string) Memory backend to use.", type=str, + choices=['redispubsub'], default="redispubsub") - parser.add_argument('-ds', '--data-store', + parser.add_argument('--data-store', help="(string) Data store to use.", type=str, + choices=['s3', 'nfs'], default="s3") - parser.add_argument('-ns', '--nfs-server', + parser.add_argument('--nfs-server', help="(string) Addresss of the nfs server.", type=str, required=False) - parser.add_argument('-np', '--nfs-path', + parser.add_argument('--nfs-path', help="(string) Exported path for the nfs server.", type=str, required=False) @@ -91,14 +93,19 @@ if __name__ == '__main__': help="(string) S3 bucket name to use when S3 data store is used.", type=str, required=False) + parser.add_argument('--s3-creds-file', + help="(string) S3 credentials file to use when S3 data store is used.", + type=str, + required=False) parser.add_argument('--num-workers', - help="(string) Number of rollout workers", + help="(string) Number of rollout workers.", type=int, required=False, default=1) parser.add_argument('--policy-type', - help="(string) The type of policy: OFF/ON", + help="(string) The type of policy: OFF/ON.", type=str, + choices=['ON', 'OFF'], default='OFF') # parser.add_argument('--checkpoint_dir', @@ -109,4 +116,5 @@ if __name__ == '__main__': main(preset=args.preset, image=args.image, nfs_server=args.nfs_server, nfs_path=args.nfs_path, memory_backend=args.memory_backend, data_store=args.data_store, s3_end_point=args.s3_end_point, - s3_bucket_name=args.s3_bucket_name, num_workers=args.num_workers, policy_type=args.policy_type) + s3_bucket_name=args.s3_bucket_name, s3_creds_file=args.s3_creds_file, num_workers=args.num_workers, + policy_type=args.policy_type) diff --git a/rl_coach/training_worker.py b/rl_coach/training_worker.py index 4a6e769..922ae72 100644 --- a/rl_coach/training_worker.py +++ b/rl_coach/training_worker.py @@ -25,7 +25,7 @@ def training_worker(graph_manager, checkpoint_dir, policy_type): # initialize graph task_parameters = TaskParameters() task_parameters.__dict__['save_checkpoint_dir'] = checkpoint_dir - task_parameters.__dict__['save_checkpoint_secs'] = 60 + task_parameters.__dict__['save_checkpoint_secs'] = 20 graph_manager.create_graph(task_parameters) # save randomly initialized graph