使用 LightGBM 进行分布式训练入门#

本教程将介绍如何将现有的 LightGBM 脚本转换为使用 Ray Train。

了解如何

  1. 配置 训练函数 以报告指标和保存检查点。

  2. 为训练作业配置 缩放 以及 CPU 或 GPU 资源需求。

  3. 使用 LightGBMTrainer 启动分布式训练作业。

快速入门#

作为参考,最终代码将类似于:

import ray.train
from ray.train.lightgbm import LightGBMTrainer

def train_func():
    # Your LightGBM training code here.
    ...

scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = LightGBMTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
  1. train_func 是在每个分布式训练工作节点上执行的 Python 代码。

  2. ScalingConfig 定义了分布式训练工作节点的数量以及是否使用 GPU。

  3. LightGBMTrainer 启动分布式训练作业。

比较使用 Ray Train 和不使用 Ray Train 的 LightGBM 训练脚本。

import lightgbm as lgb

import ray.train
from ray.train.lightgbm import LightGBMTrainer, RayTrainReportCallback

# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")


def train_func():
    # 2. Load your data shard as a `lightgbm.Dataset`.

    # Get dataset shards for this worker
    train_shard = ray.train.get_dataset_shard("train")
    eval_shard = ray.train.get_dataset_shard("eval")

    # Convert shards to pandas DataFrames
    train_df = train_shard.materialize().to_pandas()
    eval_df = eval_shard.materialize().to_pandas()

    train_X = train_df.drop("target", axis=1)
    train_y = train_df["target"]
    eval_X = eval_df.drop("target", axis=1)
    eval_y = eval_df["target"]

    train_set = lgb.Dataset(train_X, label=train_y)
    eval_set = lgb.Dataset(eval_X, label=eval_y)

    # 3. Define your LightGBM model training parameters.
    params = {
        "objective": "multiclass",
        "num_class": 3,
        "metric": ["multi_logloss", "multi_error"],
        "verbosity": -1,
        "boosting_type": "gbdt",
        "num_leaves": 31,
        "learning_rate": 0.05,
        "feature_fraction": 0.9,
        "bagging_fraction": 0.8,
        "bagging_freq": 5,
        # Adding the lines below are the only changes needed
        # for your `lgb.train` call!
        "tree_learner": "data_parallel",
        "pre_partition": True,
        **ray.train.lightgbm.get_network_params(),
    }

    # 4. Do distributed data-parallel training.
    # Ray Train sets up the necessary coordinator processes and
    # environment variables for your workers to communicate with each other.
    model = lgb.train(
        params,
        train_set,
        valid_sets=[eval_set],
        valid_names=["eval"],
        num_boost_round=100,
        # Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
        callbacks=[RayTrainReportCallback()],
    )


# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})

# 6. Launch distributed training job.
trainer = LightGBMTrainer(
    train_func,
    scaling_config=scaling_config,
    datasets={"train": train_dataset, "eval": eval_dataset},
)
result = trainer.fit()

# 7. Load the trained model.
model = RayTrainReportCallback.get_model(result.checkpoint)
import pandas as pd
import lightgbm as lgb

# 1. Load your data as a `lightgbm.Dataset`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")

train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]

train_set = lgb.Dataset(train_X, label=train_y)
eval_set = lgb.Dataset(eval_X, label=eval_y)

# 2. Define your LightGBM model training parameters.
params = {
    "objective": "multiclass",
    "num_class": 3,
    "metric": ["multi_logloss", "multi_error"],
    "verbosity": -1,
    "boosting_type": "gbdt",
    "num_leaves": 31,
    "learning_rate": 0.05,
    "feature_fraction": 0.9,
    "bagging_fraction": 0.8,
    "bagging_freq": 5,
}

# 3. Do non-distributed training.
model = lgb.train(
    params,
    train_set,
    valid_sets=[eval_set],
    valid_names=["eval"],
    num_boost_round=100,
)

设置训练函数#

首先,更新您的训练代码以支持分布式训练。从将您的原生scikit-learn Estimator LightGBM 训练代码包装到训练函数开始。

def train_func():
    # Your native LightGBM training code here.
    train_set = ...
    lightgbm.train(...)

每个分布式训练工作节点都会执行此函数。

您也可以通过 Trainer 的 train_loop_configtrain_func 的输入参数指定为字典。例如

def train_func(config):
    label_column = config["label_column"]
    num_boost_round = config["num_boost_round"]
    ...

config = {"label_column": "target", "num_boost_round": 100}
trainer = ray.train.lightgbm.LightGBMTrainer(train_func, train_loop_config=config, ...)

警告

避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化开销。而是直接在 train_func 中初始化大型对象(例如数据集、模型)。

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.lightgbm.LightGBMTrainer(train_func, train_loop_config=config, ...)

配置分布式训练参数#

要启用分布式 LightGBM 训练,请使用 ray.train.lightgbm.get_network_params() 将网络通信参数添加到您的训练配置中。此函数会自动配置必要的网络设置以进行工作进程通信。

 def train_func():
     ...
     params = {
         # Your LightGBM training parameters here
         ...
+        "tree_learner": "data_parallel",
+        "pre_partition": True,
+        **ray.train.lightgbm.get_network_params(),
     }

     model = lightgbm.train(
         params,
         ...
     )
     ...

注意

确保设置 tree_learner 以启用分布式训练。有关更多详细信息,请参阅 LightGBM 文档。如果您使用 Ray Data 加载和分片您的数据集,如快速入门示例所示,您还应该设置 pre_partition=True

报告指标和保存检查点#

要持久化您的检查点并监控训练进度,请将 ray.train.lightgbm.RayTrainReportCallback 实用回调添加到您的 Trainer 中。

import lightgbm
from ray.train.lightgbm import RayTrainReportCallback

def train_func():
    ...
    bst = lightgbm.train(
        ...,
        callbacks=[
            RayTrainReportCallback(
                metrics=["eval-multi_logloss"], frequency=1
            )
        ],
    )
    ...

将指标和检查点报告给 Ray Train,可以实现容错训练并与 Ray Tune 集成。

加载数据#

在运行分布式 LightGBM 训练时,每个工作进程应使用数据集的不同分片。

def get_train_dataset(world_rank: int) -> lightgbm.Dataset:
    # Define logic to get the Dataset shard for this worker rank
    ...

def get_eval_dataset(world_rank: int) -> lightgbm.Dataset:
    # Define logic to get the Dataset for each worker
    ...

def train_func():
    rank = ray.train.get_world_rank()
    train_set = get_train_dataset(rank)
    eval_set = get_eval_dataset(rank)
    ...

一种常见的方法是对数据集进行预分片,然后为每个工作进程分配不同的文件进行读取。

对数据集进行预分片对工作进程数量的变化不够灵活,因为某些工作进程可能被分配比其他工作进程更多的数据。为了更灵活,Ray Data 提供了在运行时分片数据集的解决方案。

使用 Ray Data 对数据集进行分片#

Ray Data 是一个分布式数据处理库,可让您轻松地在多个工作进程之间对数据进行分片和分发。

首先,将您的全部数据集加载为 Ray Data Dataset。有关如何从不同来源加载和预处理数据的更多详细信息,请参阅Ray Data 快速入门

train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")

在训练函数中,您可以使用 ray.train.get_dataset_shard() 访问此工作进程的数据集分片。将其转换为原生的 lightgbm.Dataset

def get_dataset(dataset_name: str) -> lightgbm.Dataset:
    shard = ray.train.get_dataset_shard(dataset_name)
    df = shard.materialize().to_pandas()
    X, y = df.drop("target", axis=1), df["target"]
    return lightgbm.Dataset(X, label=y)

def train_func():
    train_set = get_dataset("train")
    eval_set = get_dataset("eval")
    ...

最后,将数据集传递给 Trainer。这将自动将数据集分片到工作进程。这些键必须与在训练函数中调用 get_dataset_shard 时使用的键匹配。

trainer = LightGBMTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()

有关更多详细信息,请参阅 数据加载和预处理

配置缩放和 GPU#

在训练函数之外,创建一个 ScalingConfig 对象来配置

  1. num_workers - 分布式训练工作节点的数量。

  2. use_gpu - 每个工作节点是否应使用 GPU(或 CPU)。

  3. resources_per_worker - 每个工作进程的 CPU 或 GPU 数量。

from ray.train import ScalingConfig

# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})

注意

当使用 Ray Data 和 Ray Train 时,请注意不要通过 resources_per_worker 参数请求您集群中的所有可用 CPU。Ray Data 需要 CPU 资源来并行执行数据预处理操作。如果所有 CPU 都分配给训练工作进程,Ray Data 操作可能会成为瓶颈,导致性能下降。一个好的做法是为 Ray Data 操作留出一些 CPU 资源。

例如,如果您的集群每个节点有 8 个 CPU,您可以为训练工作进程分配 6 个 CPU,为 Ray Data 留下 2 个 CPU。

# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})

为了使用 GPU,您需要在 ScalingConfig 对象中将 use_gpu 参数设置为 True。这将为每个工作进程请求并分配一个 GPU。

# 1 node with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)

# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)

使用 GPU 时,您还需要更新训练函数以使用分配的 GPU。这可以通过将 "device" 参数设置为 "gpu" 来完成。有关 LightGBM GPU 支持的更多详细信息,请参阅 LightGBM GPU 文档

  def train_func():
      ...

      params = {
          ...,
+         "device": "gpu",
      }

      bst = lightgbm.train(
          params,
          ...
      )

配置持久存储#

创建一个 RunConfig 对象来指定结果(包括检查点和工件)将要保存的路径。

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

警告

指定一个*共享存储位置*(如云存储或 NFS)对于单节点集群是*可选的*,但对于多节点集群是*必需的*。使用本地路径将在多节点集群的检查点过程中*引发错误*。

有关更多详细信息,请参阅 配置持久存储

启动训练作业#

总而言之,您现在可以使用 LightGBMTrainer 启动分布式训练作业。

from ray.train.lightgbm import LightGBMTrainer

trainer = LightGBMTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

访问训练结果#

训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

有关更多用法示例,请参阅 检查训练结果

下一步#

在将 LightGBM 训练脚本转换为使用 Ray Train 后

  • 请参阅 用户指南 以了解有关执行特定任务的更多信息。

  • 浏览 示例 以获取有关如何使用 Ray Train 的端到端示例。

  • 请参阅 API 参考,了解本教程中的类和方法的更多详细信息。