import uuid from rl_coach.data_stores.data_store import DataStore, DataStoreParameters from kubernetes import client as k8sclient class NFSDataStoreParameters(DataStoreParameters): def __init__(self, ds_params, deployed=False, server=None, path=None): super().__init__(ds_params.store_type, ds_params.orchestrator_type, ds_params.orchestrator_params) self.namespace = "default" if "namespace" in ds_params.orchestrator_params: self.namespace = ds_params.orchestrator_params["namespace"] self.name = None self.pvc_name = None self.pv_name = None self.svc_name = None self.server = None self.path = "/" self.deployed = deployed if deployed: self.server = server self.path = path class NFSDataStore(DataStore): def __init__(self, params: NFSDataStoreParameters): self.params = params def deploy(self) -> bool: if self.params.orchestrator_type == "kubernetes": if not self.params.deployed: if not self.deploy_k8s_nfs(): return False if not self.create_k8s_nfs_resources(): return False return True def get_info(self): return k8sclient.V1PersistentVolumeClaimVolumeSource( claim_name=self.params.pvc_name ) def undeploy(self) -> bool: if self.params.orchestrator_type == "kubernetes": if not self.params.deployed: if not self.undeploy_k8s_nfs(): return False if not self.delete_k8s_nfs_resources(): return False return True def save_to_store(self): pass def load_from_store(self): pass def deploy_k8s_nfs(self) -> bool: name = "nfs-server" container = k8sclient.V1Container( name=name, image="k8s.gcr.io/volume-nfs:0.8", ports=[k8sclient.V1ContainerPort( name="nfs", container_port=2049, protocol="TCP" ), k8sclient.V1ContainerPort( name="rpcbind", container_port=111 ), k8sclient.V1ContainerPort( name="mountd", container_port=20048 ), ], volume_mounts=[k8sclient.V1VolumeMount( name='nfs-host-path', mount_path='/exports' )], security_context=k8sclient.V1SecurityContext(privileged=True) ) template = k8sclient.V1PodTemplateSpec( metadata=k8sclient.V1ObjectMeta(labels={'app': 'nfs-server'}), spec=k8sclient.V1PodSpec( containers=[container], volumes=[k8sclient.V1Volume( name="nfs-host-path", host_path=k8sclient.V1HostPathVolumeSource(path='/tmp/nfsexports-{}'.format(uuid.uuid4())) )] ) ) deployment_spec = k8sclient.V1DeploymentSpec( replicas=1, template=template, selector=k8sclient.V1LabelSelector( match_labels={'app': 'nfs-server'} ) ) deployment = k8sclient.V1Deployment( api_version='apps/v1', kind='Deployment', metadata=k8sclient.V1ObjectMeta(name=name, labels={'app': 'nfs-server'}), spec=deployment_spec ) k8s_apps_v1_api_client = k8sclient.AppsV1Api() try: k8s_apps_v1_api_client.create_namespaced_deployment(self.params.namespace, deployment) self.params.name = name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating nfs-server", e) return False k8s_core_v1_api_client = k8sclient.CoreV1Api() svc_name = "nfs-service" service = k8sclient.V1Service( api_version='v1', kind='Service', metadata=k8sclient.V1ObjectMeta( name=svc_name ), spec=k8sclient.V1ServiceSpec( selector={'app': self.params.name}, ports=[k8sclient.V1ServicePort( protocol='TCP', port=2049, target_port=2049 )] ) ) try: k8s_core_v1_api_client.create_namespaced_service(self.params.namespace, service) self.params.svc_name = svc_name self.params.server = 'nfs-service.{}.svc.cluster.local'.format(self.params.namespace) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating a service for nfs-server", e) return False return True def create_k8s_nfs_resources(self) -> bool: pv_name = "nfs-ckpt-pv" persistent_volume = k8sclient.V1PersistentVolume( api_version="v1", kind="PersistentVolume", metadata=k8sclient.V1ObjectMeta( name=pv_name, labels={'app': pv_name} ), spec=k8sclient.V1PersistentVolumeSpec( access_modes=["ReadWriteMany"], nfs=k8sclient.V1NFSVolumeSource( path=self.params.path, server=self.params.server ), capacity={'storage': '10Gi'}, storage_class_name="" ) ) k8s_api_client = k8sclient.CoreV1Api() try: k8s_api_client.create_persistent_volume(persistent_volume) self.params.pv_name = pv_name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating the NFS PV", e) return False pvc_name = "nfs-ckpt-pvc" persistent_volume_claim = k8sclient.V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", metadata=k8sclient.V1ObjectMeta( name=pvc_name ), spec=k8sclient.V1PersistentVolumeClaimSpec( access_modes=["ReadWriteMany"], resources=k8sclient.V1ResourceRequirements( requests={'storage': '10Gi'} ), selector=k8sclient.V1LabelSelector( match_labels={'app': self.params.pv_name} ), storage_class_name="" ) ) try: k8s_api_client.create_namespaced_persistent_volume_claim(self.params.namespace, persistent_volume_claim) self.params.pvc_name = pvc_name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating the NFS PVC", e) return False return True def undeploy_k8s_nfs(self) -> bool: del_options = k8sclient.V1DeleteOptions() k8s_apps_v1_api_client = k8sclient.AppsV1Api() try: k8s_apps_v1_api_client.delete_namespaced_deployment(self.params.name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting nfs-server", e) return False k8s_core_v1_api_client = k8sclient.CoreV1Api() try: k8s_core_v1_api_client.delete_namespaced_service(self.params.svc_name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting the service for nfs-server", e) return False return True def delete_k8s_nfs_resources(self) -> bool: del_options = k8sclient.V1DeleteOptions() k8s_api_client = k8sclient.CoreV1Api() try: k8s_api_client.delete_persistent_volume(self.params.pv_name, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting NFS PV", e) return False try: k8s_api_client.delete_namespaced_persistent_volume_claim(self.params.pvc_name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting NFS PVC", e) return False return True