1
0
mirror of https://github.com/gryf/coach.git synced 2025-12-17 19:20:19 +01:00

Adding nfs pv, pvc, waiting for memory to be full

This commit is contained in:
Ajay Deshpande
2018-09-19 09:03:11 -07:00
committed by zach dwiel
parent 13d81f65b9
commit 98850464cc
4 changed files with 188 additions and 31 deletions

View File

@@ -68,7 +68,10 @@ class DistributedExperienceReplay(Memory):
"""
Get the number of transitions in the ER
"""
try:
return self.redis_connection.info(section='keyspace')['db{}'.format(self.db)]['keys']
except Exception as e:
return 0
def sample(self, size: int) -> List[Transition]:
"""

View File

@@ -1,13 +1,15 @@
import uuid
from rl_coach.orchestrators.deploy import Deploy, DeployParameters
from kubernetes import client, config
class KubernetesParameters(DeployParameters):
def __init__(self, image: str, command: list(), arguments: list() = list(), synchronized: bool = False,
def __init__(self, name: str, image: str, command: list(), arguments: list() = list(), synchronized: bool = False,
num_workers: int = 1, kubeconfig: str = None, namespace: str = None, redis_ip: str = None,
redis_port: int = None, redis_db: int = 0):
redis_port: int = None, redis_db: int = 0, nfs_server: str = None, nfs_path: str = None,
checkpoint_dir: str = '/checkpoint'):
self.image = image
self.synchronized = synchronized
self.command = command
@@ -18,6 +20,10 @@ class KubernetesParameters(DeployParameters):
self.redis_ip = redis_ip
self.redis_port = redis_port
self.redis_db = redis_db
self.nfs_server = nfs_server
self.nfs_path = nfs_path
self.checkpoint_dir = checkpoint_dir
self.name = name
class Kubernetes(Deploy):
@@ -25,8 +31,6 @@ class Kubernetes(Deploy):
def __init__(self, deploy_parameters: KubernetesParameters):
super().__init__(deploy_parameters)
self.deploy_parameters = deploy_parameters
def setup(self) -> bool:
if self.deploy_parameters.kubeconfig:
config.load_kube_config()
else:
@@ -35,6 +39,9 @@ class Kubernetes(Deploy):
if not self.deploy_parameters.namespace:
_, current_context = config.list_kube_config_contexts()
self.deploy_parameters.namespace = current_context['context']['namespace']
self.nfs_pvc_name = 'nfs-checkpoint-pvc'
def setup(self) -> bool:
if not self.deploy_parameters.redis_ip:
# Need to spin up a redis service and a deployment.
@@ -42,10 +49,61 @@ class Kubernetes(Deploy):
print("Failed to setup redis")
return False
self.deploy_parameters.command += ['--redis_ip', self.deploy_parameters.redis_ip, '--redis_port', '{}'.format(self.deploy_parameters.redis_port)]
if not self.create_nfs_resources():
return False
return True
def create_nfs_resources(self):
persistent_volume = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolume",
metadata=client.V1ObjectMeta(
name='nfs-checkpoint-pv',
labels={'app': 'nfs-checkpoint-pv'}
),
spec=client.V1PersistentVolumeSpec(
access_modes=["ReadWriteMany"],
nfs=client.V1NFSVolumeSource(
path=self.deploy_parameters.nfs_path,
server=self.deploy_parameters.nfs_server
),
capacity={'storage': '10Gi'},
storage_class_name=""
)
)
api_client = client.CoreV1Api()
try:
api_client.create_persistent_volume(persistent_volume)
except client.rest.ApiException as e:
print("Got exception: %s\n while creating the NFS PV", e)
return False
persistent_volume_claim = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(
name="nfs-checkpoint-pvc"
),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteMany"],
resources=client.V1ResourceRequirements(
requests={'storage': '10Gi'}
),
selector=client.V1LabelSelector(
match_labels={'app': 'nfs-checkpoint-pv'}
),
storage_class_name=""
)
)
try:
api_client.create_namespaced_persistent_volume_claim(self.deploy_parameters.namespace, persistent_volume_claim)
except client.rest.ApiException as e:
print("Got exception: %s\n while creating the NFS PVC", e)
return False
return True
def deploy_redis(self) -> bool:
container = client.V1Container(
name="redis-server",
@@ -107,37 +165,52 @@ class Kubernetes(Deploy):
return False
def deploy(self) -> bool:
self.deploy_parameters.command += ['--redis_ip', self.deploy_parameters.redis_ip, '--redis_port', '{}'.format(self.deploy_parameters.redis_port)]
if self.deploy_parameters.synchronized:
return self.create_k8s_job()
else:
return self.create_k8s_deployment()
else:
return self.create_k8s_job()
def create_k8s_deployment(self) -> bool:
name = "{}-{}".format(self.deploy_parameters.name, uuid.uuid4())
container = client.V1Container(
name="worker",
name=name,
image=self.deploy_parameters.image,
command=self.deploy_parameters.command,
args=self.deploy_parameters.arguments,
image_pull_policy='Always'
image_pull_policy='Always',
volume_mounts=[client.V1VolumeMount(
name='nfs-pvc',
mount_path=self.deploy_parameters.checkpoint_dir
)]
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': 'worker'}),
metadata=client.V1ObjectMeta(labels={'app': name}),
spec=client.V1PodSpec(
containers=[container]
containers=[container],
volumes=[client.V1Volume(
name="nfs-pvc",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
)]
),
)
deployment_spec = client.V1DeploymentSpec(
replicas=self.deploy_parameters.num_workers,
template=template,
selector=client.V1LabelSelector(
match_labels={'app': 'worker'}
match_labels={'app': name}
)
)
deployment = client.V1Deployment(
api_version='apps/v1',
kind='Deployment',
metadata=client.V1ObjectMeta(name='rollout-worker'),
metadata=client.V1ObjectMeta(name=name),
spec=deployment_spec
)
@@ -150,4 +223,50 @@ class Kubernetes(Deploy):
return False
def create_k8s_job(self):
pass
name = "{}-{}".format(self.deploy_parameters.name, uuid.uuid4())
container = client.V1Container(
name=name,
image=self.deploy_parameters.image,
command=self.deploy_parameters.command,
args=self.deploy_parameters.arguments,
image_pull_policy='Always',
volume_mounts=[client.V1VolumeMount(
name='nfs-pvc',
mount_path=self.deploy_parameters.checkpoint_dir
)]
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': name}),
spec=client.V1PodSpec(
containers=[container],
volumes=[client.V1Volume(
name="nfs-pvc",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=self.nfs_pvc_name
)
)],
restart_policy='Never'
),
)
job_spec = client.V1JobSpec(
parallelism=self.deploy_parameters.num_workers,
template=template,
completions=2147483647
)
job = client.V1Job(
api_version='batch/v1',
kind='Job',
metadata=client.V1ObjectMeta(name=name),
spec=job_spec
)
api_client = client.BatchV1Api()
try:
api_client.create_namespaced_job(self.deploy_parameters.namespace, job)
return True
except client.rest.ApiException as e:
print("Got exception: %s\n while creating deployment", e)
return False

View File

@@ -3,30 +3,48 @@ import argparse
from rl_coach.orchestrators.kubernetes_orchestrator import KubernetesParameters, Kubernetes
def main(preset, image='ajaysudh/testing:coach', redis_ip='redis-service.ajay.svc'):
def main(preset: str, image: str='ajaysudh/testing:coach', redis_ip: str=None, redis_port:int=None, num_workers: int=1, nfs_server: str="", nfs_path: str=""):
rollout_command = ['python3', 'rl_coach/rollout_worker.py', '-p', preset]
training_command = ['python3', 'rl_coach/training_worker.py', '-p', preset]
rollout_params = KubernetesParameters(image, rollout_command, redis_ip=redis_ip, redis_port=6379, num_workers=1)
training_params = KubernetesParameters(image, training_command, redis_ip=redis_ip, redis_port=6379, num_workers=1)
"""
TODO:
1. Create a NFS backed PV for checkpointing.
a. Include that in both (worker, trainer) containers.
b. Change checkpoint writing logic to always write to a temporary file and then rename.
2. Test e2e 1 loop.
a. Trainer writes a checkpoint
b. Rollout worker picks it and gathers experience, writes back to redis.
c. 1 rollout worker, 1 trainer.
3. Trainer should be a job (not a deployment)
a. When all the epochs of training are done, workers should also be deleted.
4. Test e2e with multiple rollout workers.
5. Test e2e with multiple rollout workers and multiple loops.
"""
training_params = KubernetesParameters("train", image, training_command, kubeconfig='~/.kube/config', redis_ip=redis_ip, redis_port=redis_port,
nfs_server=nfs_server, nfs_path=nfs_path)
training_obj = Kubernetes(training_params)
if not training_obj.setup():
print("Could not setup")
return
rollout_obj = Kubernetes(training_params)
if not rollout_obj.setup():
print("Could not setup")
rollout_params = KubernetesParameters("worker", image, rollout_command, kubeconfig='~/.kube/config', redis_ip=training_params.redis_ip, redis_port=training_params.redis_port, num_workers=num_workers)
rollout_obj = Kubernetes(rollout_params)
# if not rollout_obj.setup():
# print("Could not setup")
if training_obj.deploy():
print("Successfully deployed")
else:
print("Could not deploy")
return
if rollout_obj.deploy():
print("Successfully deployed")
else:
print("Could not deploy")
return
if __name__ == '__main__':
@@ -39,10 +57,19 @@ if __name__ == '__main__':
help="(string) Name of a preset to run (class name from the 'presets' directory.)",
type=str,
required=True)
parser.add_argument('-ns', '--nfs-server',
help="(string) Addresss of the nfs server.)",
type=str,
required=True)
parser.add_argument('-np', '--nfs-path',
help="(string) Exported path for the nfs server",
type=str,
required=True)
# parser.add_argument('--checkpoint_dir',
# help='(string) Path to a folder containing a checkpoint to write the model to.',
# type=str,
# default='/checkpoint')
args = parser.parse_args()
main(preset=args.preset, image=args.image)
main(preset=args.preset, image=args.image, nfs_server=args.nfs_server, nfs_path=args.nfs_path)

View File

@@ -1,22 +1,19 @@
"""
"""
import argparse
import time
from rl_coach.base_parameters import TaskParameters
from rl_coach.coach import expand_preset
from rl_coach import core_types
from rl_coach.utils import short_dynamic_import
from rl_coach.memories.non_episodic.distributed_experience_replay import DistributedExperienceReplay
from rl_coach.memories.memory import MemoryGranularity
# Q: specify alternative distributed memory, or should this go in the preset?
# A: preset must define distributed memory to be used. we aren't going to take a non-distributed preset and automatically distribute it.
def heatup(graph_manager):
num_steps = graph_manager.schedule_params.heatup_steps.num_steps
while len(graph_manager.agent_params.memory) < num_steps:
time.sleep(1)
def training_worker(graph_manager, checkpoint_dir):
"""
restore a checkpoint then perform rollouts using the restored model
@@ -29,10 +26,20 @@ def training_worker(graph_manager, checkpoint_dir):
# save randomly initialized graph
graph_manager.save_checkpoint()
heatup(graph_manager)
memory = DistributedExperienceReplay(max_size=(MemoryGranularity.Transitions, 1000000),
redis_ip=graph_manager.agent_params.memory.redis_ip,
redis_port=graph_manager.agent_params.memory.redis_port)
while(memory.num_transitions() < 100):
time.sleep(10)
# TODO: critical: wait for minimum number of rollouts in memory before training
# TODO: Q: training steps passed into graph_manager.train ignored?
# TODO: specify training steps between checkpoints (in preset?)
# TODO: replace while true with what? number of steps, convergence, time, ...
# TODO: low: move evaluate out of this process
# training loop
for _ in range(10):
for _ in range(40):
graph_manager.phase = core_types.RunPhase.TRAIN
graph_manager.train(core_types.TrainingSteps(1))
graph_manager.phase = core_types.RunPhase.UNDEFINED
@@ -72,5 +79,6 @@ def main():
checkpoint_dir=args.checkpoint_dir,
)
if __name__ == '__main__':
main()