1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 19:20:19 +01:00

Make distributed coach work end-to-end.

- With data store, memory backend and orchestrator interfaces.
This commit is contained in:
Balaji Subramaniam
2018-10-04 12:28:21 -07:00
committed by zach dwiel
parent 9f92064e67
commit 844a5af831
8 changed files with 300 additions and 169 deletions

View File

@@ -4,9 +4,11 @@ import json
import time
from typing import List
from rl_coach.orchestrators.deploy import Deploy, DeployParameters
from kubernetes import client, config
from kubernetes import client as k8sclient, config as k8sconfig
from rl_coach.memories.backend.memory import MemoryBackendParameters
from rl_coach.memories.backend.memory_impl import get_memory_backend
from rl_coach.data_stores.data_store import DataStoreParameters
from rl_coach.data_stores.data_store_impl import get_data_store
class RunTypeParameters():
@@ -29,8 +31,9 @@ class RunTypeParameters():
class KubernetesParameters(DeployParameters):
def __init__(self, run_type_params: List[RunTypeParameters], kubeconfig: str = None, namespace: str = "", nfs_server: str = None,
nfs_path: str = None, checkpoint_dir: str = '/checkpoint', memory_backend_parameters: MemoryBackendParameters = None):
def __init__(self, run_type_params: List[RunTypeParameters], kubeconfig: str = None, namespace: str = None,
nfs_server: str = None, nfs_path: str = None, checkpoint_dir: str = '/checkpoint',
memory_backend_parameters: MemoryBackendParameters = None, data_store_params: DataStoreParameters = None):
self.run_type_params = {}
for run_type_param in run_type_params:
@@ -41,195 +44,204 @@ class KubernetesParameters(DeployParameters):
self.nfs_path = nfs_path
self.checkpoint_dir = checkpoint_dir
self.memory_backend_parameters = memory_backend_parameters
self.data_store_params = data_store_params
class Kubernetes(Deploy):
def __init__(self, deploy_parameters: KubernetesParameters):
super().__init__(deploy_parameters)
self.deploy_parameters = deploy_parameters
if self.deploy_parameters.kubeconfig:
config.load_kube_config()
def __init__(self, params: KubernetesParameters):
super().__init__(params)
self.params = params
if self.params.kubeconfig:
k8sconfig.load_kube_config()
else:
config.load_incluster_config()
k8sconfig.load_incluster_config()
if not self.params.namespace:
_, current_context = k8sconfig.list_kube_config_contexts()
self.params.namespace = current_context['context']['namespace']
if not self.deploy_parameters.namespace:
_, current_context = config.list_kube_config_contexts()
self.deploy_parameters.namespace = current_context['context']['namespace']
self.nfs_pvc_name = 'nfs-checkpoint-pvc'
if os.environ.get('http_proxy'):
client.Configuration._default.proxy = os.environ.get('http_proxy')
k8sclient.Configuration._default.proxy = os.environ.get('http_proxy')
self.deploy_parameters.memory_backend_parameters.orchestrator_params = {'namespace': self.deploy_parameters.namespace}
self.memory_backend = get_memory_backend(self.deploy_parameters.memory_backend_parameters)
self.params.memory_backend_parameters.orchestrator_params = {'namespace': self.params.namespace}
self.memory_backend = get_memory_backend(self.params.memory_backend_parameters)
self.params.data_store_params.orchestrator_params = {'namespace': self.params.namespace}
self.data_store = get_data_store(self.params.data_store_params)
if self.params.data_store_params.store_type == "s3":
self.s3_access_key = None
self.s3_secret_key = None
if self.params.data_store_params.creds_file:
s3config = ConfigParser()
s3config.read(self.params.data_store_params.creds_file)
try:
self.s3_access_key = s3config.get('default', 'aws_access_key_id')
self.s3_secret_key = s3config.get('default', 'aws_secret_access_key')
except Error as e:
print("Error when reading S3 credentials file: %s", e)
else:
self.s3_access_key = os.environ.get('ACCESS_KEY_ID')
self.s3_secret_key = os.environ.get('SECRET_ACCESS_KEY')
def setup(self) -> bool:
self.memory_backend.deploy()
if not self.create_nfs_resources():
return False
return True
def create_nfs_resources(self):
persistent_volume = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolume",
metadata=client.V1ObjectMeta(
name='nfs-checkpoint-pv',
labels={'app': 'nfs-checkpoint-pv'}
),
spec=client.V1PersistentVolumeSpec(
access_modes=["ReadWriteMany"],
nfs=client.V1NFSVolumeSource(
path=self.deploy_parameters.nfs_path,
server=self.deploy_parameters.nfs_server
),
capacity={'storage': '10Gi'},
storage_class_name=""
)
)
api_client = client.CoreV1Api()
try:
api_client.create_persistent_volume(persistent_volume)
except client.rest.ApiException as e:
print("Got exception: %s\n while creating the NFS PV", e)
return False
persistent_volume_claim = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(
name="nfs-checkpoint-pvc"
),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteMany"],
resources=client.V1ResourceRequirements(
requests={'storage': '10Gi'}
),
selector=client.V1LabelSelector(
match_labels={'app': 'nfs-checkpoint-pv'}
),
storage_class_name=""
)
)
try:
api_client.create_namespaced_persistent_volume_claim(self.deploy_parameters.namespace, persistent_volume_claim)
except client.rest.ApiException as e:
print("Got exception: %s\n while creating the NFS PVC", e)
if not self.data_store.deploy():
return False
return True
def deploy_trainer(self) -> bool:
trainer_params = self.deploy_parameters.run_type_params.get('trainer', None)
trainer_params = self.params.run_type_params.get('trainer', None)
if not trainer_params:
return False
trainer_params.command += ['--memory_backend_params', json.dumps(self.deploy_parameters.memory_backend_parameters.__dict__)]
trainer_params.command += ['--memory_backend_params', json.dumps(self.params.memory_backend_parameters.__dict__)]
trainer_params.command += ['--data_store_params', json.dumps(self.params.data_store_params.__dict__)]
name = "{}-{}".format(trainer_params.run_type, uuid.uuid4())
container = client.V1Container(
name=name,
image=trainer_params.image,
command=trainer_params.command,
args=trainer_params.arguments,
image_pull_policy='Always',
volume_mounts=[client.V1VolumeMount(
name='nfs-pvc',
mount_path=trainer_params.checkpoint_dir
)]
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': name}),
spec=client.V1PodSpec(
containers=[container],
volumes=[client.V1Volume(
name="nfs-pvc",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
if self.params.data_store_params.store_type == "nfs":
container = k8sclient.V1Container(
name=name,
image=trainer_params.image,
command=trainer_params.command,
args=trainer_params.arguments,
image_pull_policy='Always',
volume_mounts=[k8sclient.V1VolumeMount(
name='nfs-pvc',
mount_path=trainer_params.checkpoint_dir
)]
),
)
deployment_spec = client.V1DeploymentSpec(
)
template = k8sclient.V1PodTemplateSpec(
metadata=k8sclient.V1ObjectMeta(labels={'app': name}),
spec=k8sclient.V1PodSpec(
containers=[container],
volumes=[k8sclient.V1Volume(
name="nfs-pvc",
persistent_volume_claim=k8sclient.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
)]
),
)
else:
container = k8sclient.V1Container(
name=name,
image=trainer_params.image,
command=trainer_params.command,
args=trainer_params.arguments,
image_pull_policy='Always',
env=[k8sclient.V1EnvVar("ACCESS_KEY_ID", self.s3_access_key),
k8sclient.V1EnvVar("SECRET_ACCESS_KEY", self.s3_secret_key)]
)
template = k8sclient.V1PodTemplateSpec(
metadata=k8sclient.V1ObjectMeta(labels={'app': name}),
spec=k8sclient.V1PodSpec(
containers=[container]
),
)
deployment_spec = k8sclient.V1DeploymentSpec(
replicas=trainer_params.num_replicas,
template=template,
selector=client.V1LabelSelector(
selector=k8sclient.V1LabelSelector(
match_labels={'app': name}
)
)
deployment = client.V1Deployment(
deployment = k8sclient.V1Deployment(
api_version='apps/v1',
kind='Deployment',
metadata=client.V1ObjectMeta(name=name),
metadata=k8sclient.V1ObjectMeta(name=name),
spec=deployment_spec
)
api_client = client.AppsV1Api()
api_client = k8sclient.AppsV1Api()
try:
api_client.create_namespaced_deployment(self.deploy_parameters.namespace, deployment)
api_client.create_namespaced_deployment(self.params.namespace, deployment)
trainer_params.orchestration_params['deployment_name'] = name
return True
except client.rest.ApiException as e:
except k8sclient.rest.ApiException as e:
print("Got exception: %s\n while creating deployment", e)
return False
def deploy_worker(self):
worker_params = self.deploy_parameters.run_type_params.get('worker', None)
worker_params = self.params.run_type_params.get('worker', None)
if not worker_params:
return False
worker_params.command += ['--memory_backend_params', json.dumps(self.deploy_parameters.memory_backend_parameters.__dict__)]
worker_params.command += ['--memory_backend_params', json.dumps(self.params.memory_backend_parameters.__dict__)]
worker_params.command += ['--data_store_params', json.dumps(self.params.data_store_params.__dict__)]
name = "{}-{}".format(worker_params.run_type, uuid.uuid4())
container = client.V1Container(
name=name,
image=worker_params.image,
command=worker_params.command,
args=worker_params.arguments,
image_pull_policy='Always',
volume_mounts=[client.V1VolumeMount(
name='nfs-pvc',
mount_path=worker_params.checkpoint_dir
)]
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': name}),
spec=client.V1PodSpec(
containers=[container],
volumes=[client.V1Volume(
name="nfs-pvc",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
)],
),
)
if self.params.data_store_params.store_type == "nfs":
container = k8sclient.V1Container(
name=name,
image=worker_params.image,
command=worker_params.command,
args=worker_params.arguments,
image_pull_policy='Always',
volume_mounts=[k8sclient.V1VolumeMount(
name='nfs-pvc',
mount_path=worker_params.checkpoint_dir
)]
)
template = k8sclient.V1PodTemplateSpec(
metadata=k8sclient.V1ObjectMeta(labels={'app': name}),
spec=k8sclient.V1PodSpec(
containers=[container],
volumes=[k8sclient.V1Volume(
name="nfs-pvc",
persistent_volume_claim=k8sclient.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
)],
),
)
else:
container = k8sclient.V1Container(
name=name,
image=worker_params.image,
command=worker_params.command,
args=worker_params.arguments,
image_pull_policy='Always',
env=[k8sclient.V1EnvVar("ACCESS_KEY_ID", self.s3_access_key),
k8sclient.V1EnvVar("SECRET_ACCESS_KEY", self.s3_secret_key)]
)
template = k8sclient.V1PodTemplateSpec(
metadata=k8sclient.V1ObjectMeta(labels={'app': name}),
spec=k8sclient.V1PodSpec(
containers=[container]
)
)
deployment_spec = client.V1DeploymentSpec(
deployment_spec = k8sclient.V1DeploymentSpec(
replicas=worker_params.num_replicas,
template=template,
selector=client.V1LabelSelector(
selector=k8sclient.V1LabelSelector(
match_labels={'app': name}
)
)
deployment = client.V1Deployment(
deployment = k8sclient.V1Deployment(
api_version='apps/v1',
kind="Deployment",
metadata=client.V1ObjectMeta(name=name),
metadata=k8sclient.V1ObjectMeta(name=name),
spec=deployment_spec
)
api_client = client.AppsV1Api()
api_client = k8sclient.AppsV1Api()
try:
api_client.create_namespaced_deployment(self.deploy_parameters.namespace, deployment)
api_client.create_namespaced_deployment(self.params.namespace, deployment)
worker_params.orchestration_params['deployment_name'] = name
return True
except client.rest.ApiException as e:
except k8sclient.rest.ApiException as e:
print("Got exception: %s\n while creating deployment", e)
return False
@@ -237,19 +249,19 @@ class Kubernetes(Deploy):
pass
def trainer_logs(self):
trainer_params = self.deploy_parameters.run_type_params.get('trainer', None)
trainer_params = self.params.run_type_params.get('trainer', None)
if not trainer_params:
return
api_client = client.CoreV1Api()
api_client = k8sclient.CoreV1Api()
pod = None
try:
pods = api_client.list_namespaced_pod(self.deploy_parameters.namespace, label_selector='app={}'.format(
pods = api_client.list_namespaced_pod(self.params.namespace, label_selector='app={}'.format(
trainer_params.orchestration_params['deployment_name']
))
pod = pods.items[0]
except client.rest.ApiException as e:
except k8sclient.rest.ApiException as e:
print("Got exception: %s\n while reading pods", e)
return
@@ -264,17 +276,17 @@ class Kubernetes(Deploy):
# Try to tail the pod logs
try:
print(corev1_api.read_namespaced_pod_log(
pod_name, self.deploy_parameters.namespace, follow=True
pod_name, self.params.namespace, follow=True
), flush=True)
except client.rest.ApiException as e:
except k8sclient.rest.ApiException as e:
pass
# This part will get executed if the pod is one of the following phases: not ready, failed or terminated.
# Check if the pod has errored out, else just try again.
# Get the pod
try:
pod = corev1_api.read_namespaced_pod(pod_name, self.deploy_parameters.namespace)
except client.rest.ApiException as e:
pod = corev1_api.read_namespaced_pod(pod_name, self.params.namespace)
except k8sclient.rest.ApiException as e:
continue
if not hasattr(pod, 'status') or not pod.status:
@@ -293,18 +305,19 @@ class Kubernetes(Deploy):
return
def undeploy(self):
trainer_params = self.deploy_parameters.run_type_params.get('trainer', None)
api_client = client.AppsV1Api()
delete_options = client.V1DeleteOptions()
trainer_params = self.params.run_type_params.get('trainer', None)
api_client = k8sclient.AppsV1Api()
delete_options = k8sclient.V1DeleteOptions()
if trainer_params:
try:
api_client.delete_namespaced_deployment(trainer_params.orchestration_params['deployment_name'], self.deploy_parameters.namespace, delete_options)
except client.rest.ApiException as e:
api_client.delete_namespaced_deployment(trainer_params.orchestration_params['deployment_name'], self.params.namespace, delete_options)
except k8sclient.rest.ApiException as e:
print("Got exception: %s\n while deleting trainer", e)
worker_params = self.deploy_parameters.run_type_params.get('worker', None)
worker_params = self.params.run_type_params.get('worker', None)
if worker_params:
try:
api_client.delete_namespaced_deployment(worker_params.orchestration_params['deployment_name'], self.deploy_parameters.namespace, delete_options)
except client.rest.ApiException as e:
api_client.delete_namespaced_deployment(worker_params.orchestration_params['deployment_name'], self.params.namespace, delete_options)
except k8sclient.rest.ApiException as e:
print("Got exception: %s\n while deleting workers", e)
self.memory_backend.undeploy()
self.data_store.undeploy()