mirror of
https://github.com/gryf/coach.git
synced 2025-12-17 19:20:19 +01:00
Implement frame-work agnostic rollout and training workers (#137)
* Added checkpoint state file to coach checkpointing. * Removed TF specific code from rollout_worker, training_worker, and s3_data_store
This commit is contained in:
committed by
Balaji Subramaniam
parent
4a6c404070
commit
5332013bd1
@@ -12,37 +12,26 @@ import os
|
||||
import math
|
||||
|
||||
from rl_coach.base_parameters import TaskParameters, DistributedCoachSynchronizationType
|
||||
from rl_coach.checkpoint import CheckpointStateFile, CheckpointStateReader
|
||||
from rl_coach.core_types import EnvironmentSteps, RunPhase, EnvironmentEpisodes
|
||||
from google.protobuf import text_format
|
||||
from tensorflow.python.training.checkpoint_state_pb2 import CheckpointState
|
||||
from rl_coach.data_stores.data_store import SyncFiles
|
||||
|
||||
|
||||
def has_checkpoint(checkpoint_dir):
|
||||
"""
|
||||
True if a checkpoint is present in checkpoint_dir
|
||||
"""
|
||||
if os.path.isdir(checkpoint_dir):
|
||||
if len(os.listdir(checkpoint_dir)) > 0:
|
||||
return os.path.isfile(os.path.join(checkpoint_dir, "checkpoint"))
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def wait_for_checkpoint(checkpoint_dir, data_store=None, timeout=10):
|
||||
"""
|
||||
block until there is a checkpoint in checkpoint_dir
|
||||
"""
|
||||
chkpt_state_file = CheckpointStateFile(checkpoint_dir)
|
||||
for i in range(timeout):
|
||||
if data_store:
|
||||
data_store.load_from_store()
|
||||
|
||||
if has_checkpoint(checkpoint_dir):
|
||||
if chkpt_state_file.read() is not None:
|
||||
return
|
||||
time.sleep(10)
|
||||
|
||||
# one last time
|
||||
if has_checkpoint(checkpoint_dir):
|
||||
if chkpt_state_file.read() is not None:
|
||||
return
|
||||
|
||||
raise ValueError((
|
||||
@@ -54,21 +43,6 @@ def wait_for_checkpoint(checkpoint_dir, data_store=None, timeout=10):
|
||||
))
|
||||
|
||||
|
||||
def data_store_ckpt_load(data_store):
|
||||
while True:
|
||||
data_store.load_from_store()
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def get_latest_checkpoint(checkpoint_dir):
|
||||
if os.path.exists(os.path.join(checkpoint_dir, 'checkpoint')):
|
||||
ckpt = CheckpointState()
|
||||
contents = open(os.path.join(checkpoint_dir, 'checkpoint'), 'r').read()
|
||||
text_format.Merge(contents, ckpt)
|
||||
rel_path = os.path.relpath(ckpt.model_checkpoint_path, checkpoint_dir)
|
||||
return int(rel_path.split('_Step')[0])
|
||||
|
||||
|
||||
def should_stop(checkpoint_dir):
|
||||
return os.path.exists(os.path.join(checkpoint_dir, SyncFiles.FINISHED.value))
|
||||
|
||||
@@ -83,6 +57,7 @@ def rollout_worker(graph_manager, data_store, num_workers, task_parameters):
|
||||
graph_manager.create_graph(task_parameters)
|
||||
with graph_manager.phase_context(RunPhase.TRAIN):
|
||||
|
||||
chkpt_state_reader = CheckpointStateReader(checkpoint_dir, checkpoint_state_optional=False)
|
||||
last_checkpoint = 0
|
||||
|
||||
act_steps = math.ceil((graph_manager.agent_params.algorithm.num_consecutive_playing_steps.num_steps)/num_workers)
|
||||
@@ -97,20 +72,20 @@ def rollout_worker(graph_manager, data_store, num_workers, task_parameters):
|
||||
elif type(graph_manager.agent_params.algorithm.num_consecutive_playing_steps) == EnvironmentEpisodes:
|
||||
graph_manager.act(EnvironmentEpisodes(num_steps=act_steps))
|
||||
|
||||
new_checkpoint = get_latest_checkpoint(checkpoint_dir)
|
||||
|
||||
new_checkpoint = chkpt_state_reader.get_latest()
|
||||
if graph_manager.agent_params.algorithm.distributed_coach_synchronization_type == DistributedCoachSynchronizationType.SYNC:
|
||||
while new_checkpoint < last_checkpoint + 1:
|
||||
while new_checkpoint is None or new_checkpoint.num < last_checkpoint + 1:
|
||||
if should_stop(checkpoint_dir):
|
||||
break
|
||||
if data_store:
|
||||
data_store.load_from_store()
|
||||
new_checkpoint = get_latest_checkpoint(checkpoint_dir)
|
||||
new_checkpoint = chkpt_state_reader.get_latest()
|
||||
|
||||
graph_manager.restore_checkpoint()
|
||||
|
||||
if graph_manager.agent_params.algorithm.distributed_coach_synchronization_type == DistributedCoachSynchronizationType.ASYNC:
|
||||
if new_checkpoint > last_checkpoint:
|
||||
if new_checkpoint is not None and new_checkpoint.num > last_checkpoint:
|
||||
graph_manager.restore_checkpoint()
|
||||
|
||||
last_checkpoint = new_checkpoint
|
||||
if new_checkpoint is not None:
|
||||
last_checkpoint = new_checkpoint.num
|
||||
|
||||
Reference in New Issue
Block a user