配置持久存储#

Ray Train 运行会生成可以保存到持久存储位置的检查点

../../_images/persistent_storage_checkpoint.png

多个工作节点分布在多个节点上,将检查点上传到持久存储的示例。#

Ray Train 要求所有工作节点都能写入同一个持久存储位置。因此,Ray Train 要求使用某种外部持久存储,例如云存储(如 S3、GCS)或共享文件系统(如 AWS EFS、Google Filestore、HDFS)来进行多节点训练。

以下是持久存储支持的一些功能:

  • 检查点和容错:将检查点保存到持久存储位置,可以在节点发生故障时从最后一个检查点恢复训练。有关如何设置检查点的详细指南,请参阅保存和加载检查点

  • 实验后分析:在 Ray 集群终止后,可以集中存储最佳检查点和超参数配置等数据的统一位置。

  • 打通训练/微调与下游服务和批量推理任务:您可以轻松访问模型和工件,与他人共享或用于下游任务。

云存储 (AWS S3, Google Cloud Storage)#

提示

云存储是推荐的持久存储选项。

通过将存储桶 URI 指定为 RunConfig(storage_path) 来使用云存储。

from ray import train
from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    ...,
    run_config=train.RunConfig(
        storage_path="s3://bucket-name/sub-path/",
        name="experiment_name",
    )
)

确保 Ray 集群中的所有节点都能访问云存储,以便将工作节点的输出上传到共享的云存储桶。在此示例中,所有文件都上传到共享存储 s3://bucket-name/sub-path/experiment_name 以便进一步处理。

共享文件系统 (NFS, HDFS)#

通过将共享存储路径指定为 RunConfig(storage_path) 来使用。

from ray import train
from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    ...,
    run_config=train.RunConfig(
        storage_path="/mnt/cluster_storage",
        # HDFS example:
        # storage_path=f"hdfs://{hostname}:{port}/subpath",
        name="experiment_name",
    )
)

确保 Ray 集群中的所有节点都能访问共享文件系统,例如 AWS EFS、Google Cloud Filestore 或 HDFS,以便将输出保存到那里。在此示例中,所有文件都保存到 /mnt/cluster_storage/experiment_name 以便进一步处理。

本地存储#

单节点集群的本地存储使用#

如果您只是在单节点上运行实验(例如,在笔记本电脑上),Ray Train 将使用本地文件系统作为检查点和其他工件的存储位置。默认情况下,结果将保存到 `~/ray_results` 中的一个子目录,该子目录具有自动生成的唯一名称,除非您使用 RunConfig 中的 storage_pathname 进行自定义。

from ray import train
from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    ...,
    run_config=train.RunConfig(
        storage_path="/tmp/custom/storage/path",
        name="experiment_name",
    )
)

在此示例中,所有实验结果都可以在本地的 /tmp/custom/storage/path/experiment_name 中找到,以便进一步处理。

多节点集群的本地存储使用#

警告

在多节点上运行时,不再支持使用主节点的本地文件系统作为持久存储位置。

如果您使用 ray.train.report(..., checkpoint=...) 保存检查点并在多节点集群上运行,如果未设置 NFS 或云存储,Ray Train 将引发错误。这是因为 Ray Train 要求所有工作节点都能将检查点写入同一个持久存储位置。

如果您的训练循环不保存检查点,报告的指标仍将汇总到主节点的本地存储路径。

有关更多信息,请参阅此问题

自定义存储#

如果以上情况不符合您的需求,Ray Train 可以支持自定义文件系统并执行自定义逻辑。Ray Train 标准化了 pyarrow.fs.FileSystem 接口来与存储进行交互(请参阅此处 API 参考)。

默认情况下,传递 storage_path=s3://bucket-name/sub-path/ 将使用 pyarrow 的默认 S3 文件系统实现来上传文件。(请参阅其他默认实现。)

通过将 pyarrow.fs.FileSystem 的实现提供给 RunConfig(storage_filesystem) 来实现自定义存储上传和下载逻辑。

警告

在提供自定义文件系统时,关联的 storage_path 预计是*没有协议前缀*的合格文件系统路径。

例如,如果您为 s3://bucket-name/sub-path/ 提供自定义 S3 文件系统,则 storage_path 应为 bucket-name/sub-path/,其中 s3:// 已被剥离。请参阅下面的示例以了解用法。

import pyarrow.fs

from ray import train
from ray.train.torch import TorchTrainer

fs = pyarrow.fs.S3FileSystem(
    endpoint_override="https://:9000",
    access_key=...,
    secret_key=...
)

trainer = TorchTrainer(
    ...,
    run_config=train.RunConfig(
        storage_filesystem=fs,
        storage_path="bucket-name/sub-path",
        name="unique-run-id",
    )
)

fsspec 文件系统#

fsspec 提供了许多文件系统实现,例如 s3fsgcsfs 等。

您可以通过将 fsspec 文件系统包装在 pyarrow.fs 工具中来使用这些实现中的任何一个。

# Make sure to install: `pip install -U s3fs`
import s3fs
import pyarrow.fs

s3_fs = s3fs.S3FileSystem(
    key='miniokey...',
    secret='asecretkey...',
    endpoint_url='https://...'
)
custom_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(s3_fs))

run_config = RunConfig(storage_path="minio_bucket", storage_filesystem=custom_fs)

MinIO 和其他 S3 兼容存储#

您可以遵循上述示例来配置自定义 S3 文件系统以与 MinIO 配合使用。

请注意,将这些作为查询参数直接包含在 storage_path URI 中是另一种选择。

from ray import train
from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    ...,
    run_config=train.RunConfig(
        storage_path="s3://bucket-name/sub-path?endpoint_override=https://:9000",
        name="unique-run-id",
    )
)

Ray Train 输出概述#

到目前为止,我们已经讨论了如何配置 Ray Train 输出的存储位置。让我们通过一个具体的例子来了解这些输出具体是什么,以及它们在存储中的结构。

另请参阅

这个例子包括了检查点,在保存和加载检查点中有详细介绍。

import os
import tempfile

import ray.train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer

def train_fn(config):
    for i in range(10):
        # Training logic here
        metrics = {"loss": ...}

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(..., os.path.join(temp_checkpoint_dir, "checkpoint.pt"))
            train.report(
                metrics,
                checkpoint=Checkpoint.from_directory(temp_checkpoint_dir)
            )

trainer = TorchTrainer(
    train_fn,
    scaling_config=ray.train.ScalingConfig(num_workers=2),
    run_config=ray.train.RunConfig(
        storage_path="s3://bucket-name/sub-path/",
        name="unique-run-id",
    )
)
result: train.Result = trainer.fit()
last_checkpoint: Checkpoint = result.checkpoint

以下是所有将持久化到存储的文件列表:

{RunConfig.storage_path}  (ex: "s3://bucket-name/sub-path/")
└── {RunConfig.name}      (ex: "unique-run-id")               <- Train run output directory
    ├── *_snapshot.json                                       <- Train run metadata files (DeveloperAPI)
    ├── checkpoint_epoch=0/                                   <- Checkpoints
    ├── checkpoint_epoch=1/
    └── ...

trainer.fit 返回的 ResultCheckpoint 对象是访问这些文件中数据的最简单方式。

result.filesystem, result.path
# S3FileSystem, "bucket-name/sub-path/unique-run-id"

result.checkpoint.filesystem, result.checkpoint.path
# S3FileSystem, "bucket-name/sub-path/unique-run-id/checkpoint_epoch=0"

有关与训练结果交互的完整指南,请参阅检查训练结果

高级配置#

保留原始当前工作目录#

Ray Train 会将每个工作节点的当前工作目录更改为同一路径。

默认情况下,此路径是 Ray 会话目录(例如,/tmp/ray/session_latest)的一个子目录,该目录也是其他 Ray 日志和临时文件转储的位置。Ray 会话目录的位置可以自定义

要禁用 Ray Train 更改工作目录的默认行为,请设置环境变量 RAY_CHDIR_TO_TRIAL_DIR=0

这在您希望训练工作节点能够从启动训练脚本的目录访问相对路径时很有用。

提示

在分布式集群上运行时,您需要确保所有工作节点都有一个镜像的工作目录,以访问相同的相对路径。

实现这一点的一种方法是在 Ray 运行时环境中设置工作目录

import os

import ray
import ray.train
from ray.train.torch import TorchTrainer

os.environ["RAY_CHDIR_TO_TRIAL_DIR"] = "0"

# Write some file in the current working directory
with open("./data.txt", "w") as f:
    f.write("some data")

# Set the working directory in the Ray runtime environment
ray.init(runtime_env={"working_dir": "."})

def train_fn_per_worker(config):
    # Check that each worker can access the working directory
    # NOTE: The working directory is copied to each worker and is read only.
    assert os.path.exists("./data.txt"), os.getcwd()

trainer = TorchTrainer(
    train_fn_per_worker,
    scaling_config=ray.train.ScalingConfig(num_workers=2),
    run_config=ray.train.RunConfig(
        # storage_path=...,
    ),
)
trainer.fit()

已弃用#

以下部分描述的行为自 Ray 2.43 起已被弃用,在 Ray Train V2 中将不再支持,Ray Train V2 是 Ray Train 实现和部分 API 的大修。

有关更多信息,请参阅以下资源:

(已弃用) 持久化训练工件#

注意

此持久化训练工作节点工件的功能自 Ray 2.43 起已弃用。该功能依赖于 Ray Tune 的本地工作目录抽象,其中每个工作节点的本地文件将被复制到存储。Ray Train V2 将这两个库解耦,因此这个 API(本身价值有限)已被弃用。

在上面的示例中,我们在训练循环中将一些工件保存到工作节点的*当前工作目录*。如果您正在训练一个稳定扩散模型,您可以每隔一段时间保存一些生成的样本图像作为训练工件。

默认情况下,Ray Train 会将每个工作节点的当前工作目录更改为运行的本地暂存目录内部。这样,所有分布式训练工作节点都共享相同的工作目录的绝对路径。有关如何禁用此默认行为(如果您希望训练工作节点保留其原始工作目录,则很有用),请参阅下方

如果 RunConfig(SyncConfig(sync_artifacts=True)),则保存在此目录中的所有工件都将持久化到存储。

工件同步的频率可以通过 SyncConfig 进行配置。请注意,此行为默认是关闭的。

这是 Train 运行输出目录外观的示例,包含工作节点工件。

s3://bucket-name/sub-path (RunConfig.storage_path)
└── experiment_name (RunConfig.name)          <- The "experiment directory"
    ├── experiment_state-*.json
    ├── basic-variant-state-*.json
    ├── trainer.pkl
    ├── tuner.pkl
    └── TorchTrainer_46367_00000_0_...        <- The "trial directory"
        ├── events.out.tfevents...            <- Tensorboard logs of reported metrics
        ├── result.json                       <- JSON log file of reported metrics
        ├── checkpoint_000000/                <- Checkpoints
        ├── checkpoint_000001/
        ├── ...
        ├── artifact-rank=0-iter=0.txt        <- Worker artifacts
        ├── artifact-rank=1-iter=0.txt
        └── ...

警告

每个工作节点保存的工件都将被同步到存储。如果您有多个工作节点位于同一节点上,请确保工作节点不会删除其共享工作目录中的文件。

最佳实践是仅从一个工作节点写入工件,除非您确实需要多个工作节点的工件。

from ray import train

if train.get_context().get_world_rank() == 0:
    # Only the global rank 0 worker saves artifacts.
    ...

if train.get_context().get_local_rank() == 0:
    # Every local rank 0 worker saves artifacts.
    ...

(已弃用) 设置本地暂存目录#

注意

本节描述的行为取决于 Ray Tune 的实现细节,这些细节不再适用于 Ray Train V2。

警告

在 2.10 之前,环境变量 RAY_AIR_LOCAL_CACHE_DIRRunConfig(local_dir) 是将本地暂存目录配置在主目录(`~/ray_results`)之外的方式。

这些配置不再用于配置本地暂存目录。请改用 RunConfig(storage_path) 来配置您的运行输出的去向。

除了直接写入 storage_path 的检查点等文件外,Ray Train 还在将日志文件和元数据文件持久化(复制/上传)到 storage_path 之前,将其写入一个中间的*本地暂存目录*。每个工作节点的当前工作目录在此本地暂存目录内设置。

默认情况下,本地暂存目录是 Ray 会话目录(例如,/tmp/ray/session_latest)的一个子目录,该目录也是其他临时 Ray 文件转储的位置。

通过设置临时 Ray 会话目录的位置来自定义暂存目录的位置。

这是本地暂存目录外观的示例。

/tmp/ray/session_latest/artifacts/<ray-train-job-timestamp>/
└── experiment_name
    ├── driver_artifacts    <- These are all uploaded to storage periodically
    │   ├── Experiment state snapshot files needed for resuming training
    │   └── Metrics logfiles
    └── working_dirs        <- These are uploaded to storage if `SyncConfig(sync_artifacts=True)`
        └── Current working directory of training workers, which contains worker artifacts

警告

您通常不需要查看本地暂存目录。 storage_path 应该是您需要交互的唯一路径。

本地暂存目录的结构可能会在 Ray Train 的未来版本中发生变化 – 请不要在您的应用程序中依赖这些本地暂存文件。