处理故障和节点抢占(已弃用 API)#
从 Train Worker 故障中自动恢复#
Ray Train 内置了容错机制,可从 worker 故障(即 RayActorError
)中恢复。检测到故障时,worker 将被关闭并添加新的 worker。
训练函数将重新启动,但可以通过检查点恢复先前执行的进度。
从 worker 故障中恢复的每个实例都被视为一次重试。重试次数可通过传递给 Trainer
的 RunConfig
参数集中 FailureConfig
的 max_failures
属性进行配置。
import ray.train
# Tries to recover a run up to this many times.
failure_config = ray.train.FailureConfig(max_failures=2)
# No limit on the number of retries.
failure_config = ray.train.FailureConfig(max_failures=-1)
将恢复哪个检查点?#
Ray Train 将自动从报告给 Ray Train 的最新可用检查点恢复训练。
这将是传递给 train.report()
的最后一个检查点。
恢复 Ray Train 实验#
在实验层面,Trainer 恢复允许您从先前中断的实验中断处恢复。
Train 实验可能由于以下原因之一中断
实验被手动中断(例如,Ctrl+C,或被抢占的头节点实例)。
头节点崩溃(例如,OOM 或其他运行时错误)。
整个集群宕机(例如,影响所有节点的网络错误)。
Ray Train 的所有内置 trainer 都可能进行 Trainer 恢复,但在示例中我们使用 TorchTrainer
进行演示。我们也使用 <Framework>Trainer
来指代所有内置 trainer 共享的方法。
假设您的初始 Train 实验配置如下。实际训练循环仅用于演示:重要细节是已经实现了保存和加载检查点的逻辑。
import os
import tempfile
from typing import Dict, Optional
import torch
import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer
def get_datasets() -> Dict[str, ray.data.Dataset]:
return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}
def train_loop_per_worker(config: dict):
from torchvision.models import resnet18
model = resnet18()
# Checkpoint loading
checkpoint: Optional[Checkpoint] = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model.load_state_dict(model_state_dict)
model = train.torch.prepare_model(model)
train_ds = train.get_dataset_shard("train")
for epoch in range(5):
# Do some training...
# Checkpoint saving
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
),
)
result = trainer.fit()
实验结果和检查点保存到由 RunConfig
配置的路径。如果实验因上述原因之一中断,请使用此路径恢复
from ray.train.torch import TorchTrainer
restored_trainer = TorchTrainer.restore(
path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
datasets=get_datasets(),
)
提示
您还可以从远程路径恢复(例如,从存储在 s3 存储桶中的实验目录)。
original_trainer = TorchTrainer(
# ...
run_config=train.RunConfig(
# Configure cloud storage
storage_path="s3://results-bucket",
name="dl_trainer_restore",
),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
"s3://results-bucket/dl_trainer_restore",
datasets=get_datasets(),
)
注意
不同的 trainer 可以在恢复时选择重新指定更多参数。如果最初提供了数据集,则只有数据集需要在恢复时重新指定。
TorchTrainer.restore
、TensorflowTrainer.restore
和 HorovodTrainer.restore
可以接受与其父类 DataParallelTrainer.restore
相同的参数。
除非另有说明,否则其他 trainer 将接受与 BaseTrainer.restore
相同的参数。
自动恢复#
添加下面的分支逻辑将允许您在中断后运行相同的脚本,从上次运行中断的地方继续训练。请注意,我们使用 <Framework>Trainer.can_restore
工具方法来确定给定实验目录的存在和有效性。
experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
result = trainer.fit()
else:
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"),
name="dl_restore_autoresume",
),
)
result = trainer.fit()
另请参阅
请参阅 BaseTrainer.restore
文档字符串以获取完整示例。
注意
<Framework>Trainer.restore
与 <Framework>Trainer(..., resume_from_checkpoint=...)
不同。resume_from_checkpoint
用于启动一个新的 Train 实验,该实验将结果写入新目录并从迭代 0 开始。
<Framework>Trainer.restore
用于继续现有实验,新结果将继续追加到现有日志中。