使用 XGBoost 开始分布式训练#

本教程将引导您完成将现有 XGBoost 脚本转换为使用 Ray Train 的过程。

了解如何

  1. 配置训练函数以报告指标和保存检查点。

  2. 配置训练作业的扩缩容以及 CPU 或 GPU 资源需求。

  3. 使用 XGBoostTrainer 启动分布式训练作业。

快速入门#

作为参考,最终代码看起来会像这样

import ray.train
from ray.train.xgboost import XGBoostTrainer

def train_func():
    # Your XGBoost training code here.
    ...

scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = XGBoostTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
  1. train_func 是在每个分布式训练 Worker 上执行的 Python 代码。

  2. ScalingConfig 定义了分布式训练 Worker 的数量以及是否使用 GPU。

  3. XGBoostTrainer 启动分布式训练作业。

比较使用 Ray Train 和不使用 Ray Train 的 XGBoost 训练脚本。

import xgboost

import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback

# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")


def train_func():
    # 2. Load your data shard as an `xgboost.DMatrix`.

    # Get dataset shards for this worker
    train_shard = ray.train.get_dataset_shard("train")
    eval_shard = ray.train.get_dataset_shard("eval")

    # Convert shards to pandas DataFrames
    train_df = train_shard.materialize().to_pandas()
    eval_df = eval_shard.materialize().to_pandas()

    train_X = train_df.drop("target", axis=1)
    train_y = train_df["target"]
    eval_X = eval_df.drop("target", axis=1)
    eval_y = eval_df["target"]

    dtrain = xgboost.DMatrix(train_X, label=train_y)
    deval = xgboost.DMatrix(eval_X, label=eval_y)

    # 3. Define your xgboost model training parameters.
    params = {
        "tree_method": "approx",
        "objective": "reg:squarederror",
        "eta": 1e-4,
        "subsample": 0.5,
        "max_depth": 2,
    }

    # 4. Do distributed data-parallel training.
    # Ray Train sets up the necessary coordinator processes and
    # environment variables for your workers to communicate with each other.
    bst = xgboost.train(
        params,
        dtrain=dtrain,
        evals=[(deval, "validation")],
        num_boost_round=10,
        # Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
        callbacks=[RayTrainReportCallback()],
    )


# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})

# 6. Launch distributed training job.
trainer = XGBoostTrainer(
    train_func,
    scaling_config=scaling_config,
    datasets={"train": train_dataset, "eval": eval_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()

# 7. Load the trained model
import os

with result.checkpoint.as_directory() as checkpoint_dir:
    model_path = os.path.join(checkpoint_dir, RayTrainReportCallback.CHECKPOINT_NAME)
    model = xgboost.Booster()
    model.load_model(model_path)
import pandas as pd
import xgboost

# 1. Load your data as an `xgboost.DMatrix`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")

train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]

dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)

# 2. Define your xgboost model training parameters.
params = {
    "tree_method": "approx",
    "objective": "reg:squarederror",
    "eta": 1e-4,
    "subsample": 0.5,
    "max_depth": 2,
}

# 3. Do non-distributed training.
bst = xgboost.train(
    params,
    dtrain=dtrain,
    evals=[(deval, "validation")],
    num_boost_round=10,
)

设置训练函数#

首先,更新您的训练代码以支持分布式训练。将您的 nativescikit-learn estimator XGBoost 训练代码封装在一个 训练函数中。

def train_func():
    # Your native XGBoost training code here.
    dmatrix = ...
    xgboost.train(...)

每个分布式训练 Worker 都会执行此函数。

您还可以通过 Trainer 的 train_loop_configtrain_func 的输入参数指定为字典。例如

def train_func(config):
    label_column = config["label_column"]
    num_boost_round = config["num_boost_round"]
    ...

config = {"label_column": "y", "num_boost_round": 10}
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)

警告

避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化开销。相反,直接在 train_func 中初始化大型对象(例如数据集、模型)。

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)

Ray Train 会自动执行分布式 XGBoost 训练所需的 Worker 通信设置。

报告指标和保存检查点#

为了持久化检查点和监控训练进度,请为您的 Trainer 添加一个 ray.train.xgboost.RayTrainReportCallback 工具回调。

import xgboost
from ray.train.xgboost import RayTrainReportCallback

def train_func():
    ...
    bst = xgboost.train(
        ...,
        callbacks=[
            RayTrainReportCallback(
                metrics=["eval-logloss"], frequency=1
            )
        ],
    )
    ...

向 Ray Train 报告指标和检查点可以启用容错训练,并与 Ray Tune 集成。

加载数据#

运行分布式 XGBoost 训练时,每个 Worker 应使用数据集的不同分片。

def get_train_dataset(world_rank: int) -> xgboost.DMatrix:
    # Define logic to get the DMatrix shard for this worker rank
    ...

def get_eval_dataset(world_rank: int) -> xgboost.DMatrix:
    # Define logic to get the DMatrix for each worker
    ...

def train_func():
    rank = ray.train.get_world_rank()
    dtrain = get_train_dataset(rank)
    deval = get_eval_dataset(rank)
    ...

一种常见方法是预先对数据集进行分片,然后为每个 Worker 分配一组不同的文件来读取。

预先分片数据集对于 Worker 数量的变化不够灵活,因为一些 Worker 可能会被分配比其他 Worker 更多的数据。为了获得更高的灵活性,Ray Data 提供了一种在运行时对数据集进行分片的解决方案。

使用 Ray Data 对数据集进行分片#

Ray Data 是一个分布式数据处理库,可让您轻松地将数据分片并分布到多个 Worker 上。

首先,将您的整个数据集作为 Ray Data Dataset 加载。有关如何从不同来源加载和预处理数据的更多详细信息,请参阅Ray Data 快速入门

train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")

在训练函数中,您可以使用 ray.train.get_dataset_shard() 访问此 Worker 的数据集分片。将其转换为 native xgboost.DMatrix

def get_dmatrix(dataset_name: str) -> xgboost.DMatrix:
    shard = ray.train.get_dataset_shard(dataset_name)
    df = shard.materialize().to_pandas()
    X, y = df.drop("target", axis=1), df["target"]
    return xgboost.DMatrix(X, label=y)

def train_func():
    dtrain = get_dmatrix("train")
    deval = get_dmatrix("eval")
    ...

最后,将数据集传递给 Trainer。这将自动在 Worker 之间分片数据集。这些键必须与在训练函数中调用 get_dataset_shard 时使用的键匹配。

trainer = XGBoostTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()

更多详细信息,请参阅数据加载和预处理

配置规模和 GPU#

在训练函数外部,创建一个 ScalingConfig 对象来配置

  1. num_workers - 分布式训练 Worker 进程的数量。

  2. use_gpu - 每个 Worker 是否应使用 GPU(或 CPU)。

  3. resources_per_worker - 每个 Worker 的 CPU 或 GPU 数量。

from ray.train import ScalingConfig

# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})

注意

将 Ray Data 与 Ray Train 一起使用时,请注意不要使用 resources_per_worker 参数请求集群中所有可用的 CPU。Ray Data 需要 CPU 资源来并行执行数据预处理操作。如果所有 CPU 都分配给训练 Worker,Ray Data 操作可能会成为瓶颈,导致性能下降。一个好的实践是保留一部分 CPU 资源供 Ray Data 操作使用。

例如,如果您的集群每个节点有 8 个 CPU,您可以分配 6 个 CPU 给训练 Worker,并保留 2 个 CPU 给 Ray Data

# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})

要使用 GPU,您需要在 ScalingConfig 对象中将 use_gpu 参数设置为 True。这将为每个 Worker 请求并分配一个 GPU。

# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)

使用 GPU 时,您还需要更新训练函数以使用分配的 GPU。这可以通过将 "device" 参数设置为 "cuda" 来完成。有关 XGBoost GPU 支持的更多详细信息,请参阅 XGBoost GPU 文档

  def train_func():
      ...

      params = {
          ...,
+         "device": "cuda",
      }

      bst = xgboost.train(
          params,
          ...
      )

配置持久存储#

创建一个 RunConfig 对象,指定结果(包括检查点和 artifacts)的保存路径。

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

警告

对于单节点集群,指定共享存储位置(例如云存储或 NFS)是可选的,但对于多节点集群是必需的。对于多节点集群,使用本地路径将在检查点过程中引发错误

更多详细信息,请参阅配置持久存储

启动训练作业#

将所有这些结合起来,您现在可以使用 XGBoostTrainer 启动分布式训练作业了。

from ray.train.xgboost import XGBoostTrainer

trainer = XGBoostTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

访问训练结果#

训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

更多使用示例,请参阅检查训练结果

后续步骤#

将 XGBoost 训练脚本转换为使用 Ray Train 后

  • 参阅用户指南,了解如何执行特定任务的更多信息。

  • 浏览示例,获取 Ray Train 端到端使用示例。

  • 查阅API 参考,了解本教程中类和方法的更多详细信息。