Ray 上 XGBoost 和 LightGBM 的分布式训练与推理#

try-anyscale-quickstart

注意:本 Notebook 中显示的 API 现已弃用。请转而参考 使用 XGBoost 进行分布式训练入门指南 中更新的 API。

在本教程中,您将了解如何在 Ray 上使用 XGBoost 和 LightGBM 扩展数据预处理、训练和推理。

要运行本教程,我们需要安装以下依赖项

pip install -qU "ray[data,train]" xgboost lightgbm

接下来,我们需要一些导入

from typing import Tuple

import pandas as pd
import xgboost

import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import StandardScaler
from ray.train import Checkpoint, CheckpointConfig, Result, RunConfig, ScalingConfig
from ray.train.xgboost import XGBoostTrainer

接下来我们定义一个函数来加载我们的训练集、验证集和测试集。

def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    """Load and split the dataset into train, validation, and test sets."""
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

如何进行训练数据预处理?#

预处理是为训练准备数据的关键步骤,特别是对于表格数据集。Ray Data 提供了内置预处理器,可简化常见的特征预处理任务,特别是针对表格数据。这些预处理器可以与 Ray Datasets 无缝集成,让您可以在训练前以容错和分布式的方式预处理数据。方法如下

# Load and split the dataset
train_dataset, valid_dataset, _test_dataset = prepare_data()

# pick some dataset columns to scale
columns_to_scale = ["mean radius", "mean texture"]

# Initialize the preprocessor
preprocessor = StandardScaler(columns=columns_to_scale)
# train the preprocessor on the training set
preprocessor.fit(train_dataset)
# apply the preprocessor to the training and validation sets
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
隐藏代码单元输出
2025-02-07 16:30:44,905	INFO worker.py:1841 -- Started a local Ray instance.
2025-02-07 16:30:45,596	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:45,596	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AggregateNumRows[AggregateNumRows]
2025-02-07 16:30:46,367	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:46,367	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
2025-02-07 16:30:46,729	INFO dataset.py:2704 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2025-02-07 16:30:46,730	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:46,730	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]

保存和加载 XGBoost 和 LightGBM 检查点#

检查点是一个强大的功能。对于长时间运行的训练会话尤其有用,因为它使您能够在中断时从上次检查点恢复训练。XGBoostTrainerLightGBMTrainer 都内置了检查点功能。可以使用静态方法 XGBoostTrainer.get_modelLightGBMTrainer.get_model 将这些检查点加载到内存中。

唯一需要的更改是配置 CheckpointConfig 来设置检查点频率。例如,以下配置在每个 boosting 回合保存一个检查点,并且只保留最新的检查点。

# Configure checkpointing to save progress during training
run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        # Checkpoint every 10 iterations.
        checkpoint_frequency=10,
        # Only keep the latest checkpoint and delete the others.
        num_to_keep=1,
    )
    ## If running in a multi-node cluster, this is where you
    ## should configure the run's persistent storage that is accessible
    ## across all worker nodes with `storage_path="s3://..."`
)

Train 中基于树模型的初步训练#

与原始的 xgboost.train()lightgbm.train() 函数一样,训练参数通过 params 字典传递。

XGBoost 示例#

# Set up the XGBoost trainer with the specified configuration
trainer = XGBoostTrainer(
    # see "How to scale out training?" for more details
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    # XGBoost specific params (see the `xgboost.train` API reference)
    params={
        "objective": "binary:logistic",
        # uncomment this and set `use_gpu=True` to use GPU for training
        # "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # store the preprocessor in the checkpoint for inference later
    metadata={"preprocessor_pkl": preprocessor.serialize()},
    run_config=run_config,
)
result = trainer.fit()
隐藏代码单元输出
2025-02-07 16:32:31,783	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949
== Status ==
Current time: 2025-02-07 16:32:31 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/12 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-02-07_16-30-44_167214_9631/artifacts/2025-02-07_16-32-31/XGBoostTrainer_2025-02-07_16-32-31/driver_artifacts
Number of trials: 1/1 (1 PENDING)
2025-02-07 16:32:34,045	WARNING experiment_state.py:206 -- Experiment state snapshotting has been triggered multiple times in the last 5.0 seconds and may become a bottleneck. A snapshot is forced if `CheckpointConfig(num_to_keep)` is set, and a trial has checkpointed >= `num_to_keep` times since the last snapshot.
You may want to consider increasing the `CheckpointConfig(num_to_keep)` or decreasing the frequency of saving checkpoints.
You can suppress this warning by setting the environment variable TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S to a smaller value than the current threshold (5.0). Set it to 0 to completely suppress this warning.
2025-02-07 16:32:34,105	WARNING experiment_state.py:206 -- Experiment state snapshotting has been triggered multiple times in the last 5.0 seconds and may become a bottleneck. A snapshot is forced if `CheckpointConfig(num_to_keep)` is set, and a trial has checkpointed >= `num_to_keep` times since the last snapshot.
You may want to consider increasing the `CheckpointConfig(num_to_keep)` or decreasing the frequency of saving checkpoints.
You can suppress this warning by setting the environment variable TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S to a smaller value than the current threshold (5.0). Set it to 0 to completely suppress this warning.
2025-02-07 16:32:35,137	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/rdecal/ray_results/XGBoostTrainer_2025-02-07_16-32-31' in 0.0110s.
2025-02-07 16:32:35,140	INFO tune.py:1041 -- Total run time: 3.36 seconds (3.34 seconds for the tuning loop).
== Status ==
Current time: 2025-02-07 16:32:35 (running for 00:00:03.35)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/12 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-02-07_16-30-44_167214_9631/artifacts/2025-02-07_16-32-31/XGBoostTrainer_2025-02-07_16-32-31/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)

现在我们可以查看模型的指标

print(result.metrics)

输出应该类似于

{'train-logloss': 0.00587594546605992, 'train-error': 0.0, 'valid-logloss': 0.06215000962556052, 'valid-error': 0.02941176470588235, 'time_this_iter_s': 0.0101318359375, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '40fed_00000', 'date': '2023-07-06_18-33-25', 'timestamp': 1688693605, 'time_total_s': 4.901317834854126, 'pid': 40725, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.901317834854126, 'iterations_since_restore': 101, 'experiment_tag': '0'}

提示

启用检查点后,您可以按照本指南启用容错。::

LightGBM 示例#

修改此示例以使用 LightGBM 而非 XGBoost 非常简单。您只需更改 trainer 类和模型特定的参数。

- from ray.train.xgboost import XGBoostTrainer
+ from ray.train.lightgbm import LightGBMTrainer

- trainer = XGBoostTrainer(
+ trainer = LightGBMTrainer(

- "objective": "binary:logistic",
+ "objective": "binary",
- "eval_metric": ["logloss", "error"],
+ "metric": ["binary_logloss", "binary_error"],

使用训练好的树模型运行推理#

现在我们有了训练好的模型,可以使用它在新数据上进行预测。我们来定义一个工具函数,使用我们训练好的模型执行流式和分布式批量推理。

class Predict:
    def __init__(self, checkpoint: Checkpoint):
        self.model = XGBoostTrainer.get_model(checkpoint)
        # extract the preprocessor from the checkpoint metadata
        self.preprocessor = Preprocessor.deserialize(
            checkpoint.get_metadata()["preprocessor_pkl"]
        )

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        preprocessed_batch = self.preprocessor.transform_batch(batch)
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        return {"predictions": self.model.predict(dmatrix)}


def predict_xgboost(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict,
        fn_constructor_args=[result.checkpoint],
        concurrency=1,
        batch_format="pandas",
    )

    predicted_labels = scores.map_batches(
        lambda df: (df > 0.5).astype(int), batch_format="pandas"
    )
    print("PREDICTED LABELS")
    predicted_labels.show()

现在我们可以从模型中获取测试集上的预测结果

predict_xgboost(result)
隐藏代码单元输出
2025-02-07 16:30:52,878	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:52,878	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AggregateNumRows[AggregateNumRows]
2025-02-07 16:30:53,241	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:53,241	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
2025-02-07 16:30:53,559	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:53,559	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(drop_columns)->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> LimitOperator[limit=20]
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}

输出应该类似于

PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}

如何扩展训练?#

使用 Ray Train 的主要优势之一是它能够轻松扩展您的训练工作负载。通过调整 ScalingConfig,您可以优化资源利用率并减少训练时间,使其成为大规模机器学习任务的理想选择。

注意

Ray Train 不会修改或改变底层 XGBoost 或 LightGBM 分布式训练算法的工作方式。Ray 仅提供编排、数据摄取和容错功能。有关 GBDT 分布式训练的更多信息,请参阅 XGBoost 文档LightGBM 文档

多节点 CPU 示例#

配置:4 个节点,每个节点 8 个 CPU。

用例:在多节点训练中利用所有资源。

scaling_config = ScalingConfig(
    num_workers=4,
    resources_per_worker={"CPU": 8},
)

单节点多 GPU 示例#

配置:1 个节点,8 个 CPU 和 4 个 GPU。

用例:如果您有一个带有多个 GPU 的单节点,您需要使用分布式训练来利用所有 GPU。

scaling_config = ScalingConfig(
    num_workers=4,
    use_gpu=True,
)

多节点多 GPU 示例#

配置:4 个节点,每个节点 8 个 CPU 和 4 个 GPU。

用例:如果您有多个带有多个 GPU 的节点,您需要为每个 GPU 调度一个 worker。

scaling_config = ScalingConfig(
    num_workers=16,
    use_gpu=True,
)

请注意,您只需调整 worker 的数量。Ray 会自动处理其他一切。

警告

对于单节点集群,指定 共享存储位置(例如云存储或 NFS)是 可选的,但对于 多节点集群是 必需的。使用本地路径会在多节点集群的检查点保存过程中 引发错误

trainer = XGBoostTrainer(
    ..., run_config=ray.train.RunConfig(storage_path="s3://...")
)

您应该使用多少个远程 actor?#

这取决于您的工作负载和集群配置。通常,对于仅使用 CPU 的训练,每个节点运行多个远程 actor 并没有内在优势。这是因为 XGBoost 已经可以通过线程利用多个 CPU。

然而,在某些情况下,您应该考虑在每个节点上启动多个 actor

对于 多 GPU 训练,每个 GPU 都应该有一个独立的远程 actor。因此,如果您的机器有 24 个 CPU 和 4 个 GPU,您应该启动 4 个远程 actor,每个 actor 分配 6 个 CPU 和 1 个 GPU。

异构集群 中,您可能需要找到 CPU 数量的 最大公约数。例如,对于由分别具有 4、8 和 12 个 CPU 的三个节点组成的集群,您应该将 actor 数量设置为 6,每个 actor 的 CPU 数量设置为 4。

如何使用 GPU 进行训练?#

Ray Train 支持 XGBoost 和 LightGBM 的多 GPU 训练。核心后端自动利用 NCCL2 进行跨设备通信。您只需为每个 GPU 启动一个 actor 并设置 GPU 兼容参数即可。例如,将 XGBoost 的 tree_method 设置为 gpu_hist。更多详细信息请参阅 XGBoost 文档。

例如,如果您有 2 台机器,每台有 4 个 GPU,您需要启动 8 个 worker,并设置 use_gpu=True。通常情况下,为每个 actor 分配少于 (例如 0.5) 或多于一个 GPU 没有额外的好处。

您应该在每台机器上的 actor 之间平均分配 CPU,因此如果您的机器除了 4 个 GPU 外还有 16 个 CPU,每个 actor 应该使用 4 个 CPU。

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration.
        use_gpu=True,
    ),
    params={
        # XGBoost specific params
        "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    ...
)

如何优化 XGBoost 内存使用?#

XGBoost 使用一种称为 DMatrix 的计算优化数据结构来存储训练数据。然而,将数据集转换为 DMatrix 需要存储数据的完整副本以及中间转换。在 64 位系统上,格式为 64 位浮点数。因此,根据系统和原始数据集的 dtype,此矩阵占用的内存可能比原始数据集更多。

假设在 64 位系统上使用 dtype float32,基于 CPU 训练的 峰值内存使用量 至少是 数据集大小的 3 倍,再加上用于其他资源(例如操作系统要求和存储中间结果)的约 400,000 KiB

示例#

  • 机器类型:AWS m5.xlarge (4 vCPU, 16 GiB 内存)

  • 可用内存:约 15,350,000 KiB

  • 数据集:1,250,000 行,1024 个特征,dtype float32。总大小:5,000,000 KiB

  • XGBoost DMatrix 大小:约 10,000,000 KiB

该数据集正好适合在此节点上进行训练。

请注意,在 32 位系统上,DMatrix 大小可能会更小。

GPU#

通常,基于 GPU 的训练也存在相同的内存要求。此外,GPU 必须有足够的内存来容纳数据集。

在前面的示例中,GPU 必须至少有 10,000,000 KiB(约 9.6 GiB)内存。然而,经验数据显示,使用 DeviceQuantileDMatrix 似乎会导致更高的峰值 GPU 内存使用量,这可能是用于加载数据时的中间存储(约 10%)。

最佳实践#

为了减少峰值内存使用,请考虑以下建议

  • 将数据存储为 float32 或更低精度。通常不需要更高的精度,并且以较小的格式存储数据有助于减少初始数据加载时的峰值内存使用。

  • 从 CSV 加载数据时传递 dtype。否则,浮点值默认加载为 np.float64,这会增加 33% 的峰值内存使用。