使用 LightGBM 进行分布式训练入门#
本教程将介绍如何将现有的 LightGBM 脚本转换为使用 Ray Train。
了解如何
配置 训练函数 以报告指标和保存检查点。
为训练作业配置 缩放 以及 CPU 或 GPU 资源需求。
使用
LightGBMTrainer启动分布式训练作业。
快速入门#
作为参考,最终代码将类似于:
import ray.train
from ray.train.lightgbm import LightGBMTrainer
def train_func():
# Your LightGBM training code here.
...
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = LightGBMTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func是在每个分布式训练工作节点上执行的 Python 代码。ScalingConfig定义了分布式训练工作节点的数量以及是否使用 GPU。LightGBMTrainer启动分布式训练作业。
比较使用 Ray Train 和不使用 Ray Train 的 LightGBM 训练脚本。
import lightgbm as lgb
import ray.train
from ray.train.lightgbm import LightGBMTrainer, RayTrainReportCallback
# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")
def train_func():
# 2. Load your data shard as a `lightgbm.Dataset`.
# Get dataset shards for this worker
train_shard = ray.train.get_dataset_shard("train")
eval_shard = ray.train.get_dataset_shard("eval")
# Convert shards to pandas DataFrames
train_df = train_shard.materialize().to_pandas()
eval_df = eval_shard.materialize().to_pandas()
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
train_set = lgb.Dataset(train_X, label=train_y)
eval_set = lgb.Dataset(eval_X, label=eval_y)
# 3. Define your LightGBM model training parameters.
params = {
"objective": "multiclass",
"num_class": 3,
"metric": ["multi_logloss", "multi_error"],
"verbosity": -1,
"boosting_type": "gbdt",
"num_leaves": 31,
"learning_rate": 0.05,
"feature_fraction": 0.9,
"bagging_fraction": 0.8,
"bagging_freq": 5,
# Adding the lines below are the only changes needed
# for your `lgb.train` call!
"tree_learner": "data_parallel",
"pre_partition": True,
**ray.train.lightgbm.get_network_params(),
}
# 4. Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for your workers to communicate with each other.
model = lgb.train(
params,
train_set,
valid_sets=[eval_set],
valid_names=["eval"],
num_boost_round=100,
# Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
callbacks=[RayTrainReportCallback()],
)
# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})
# 6. Launch distributed training job.
trainer = LightGBMTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": train_dataset, "eval": eval_dataset},
)
result = trainer.fit()
# 7. Load the trained model.
model = RayTrainReportCallback.get_model(result.checkpoint)
import pandas as pd
import lightgbm as lgb
# 1. Load your data as a `lightgbm.Dataset`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
train_set = lgb.Dataset(train_X, label=train_y)
eval_set = lgb.Dataset(eval_X, label=eval_y)
# 2. Define your LightGBM model training parameters.
params = {
"objective": "multiclass",
"num_class": 3,
"metric": ["multi_logloss", "multi_error"],
"verbosity": -1,
"boosting_type": "gbdt",
"num_leaves": 31,
"learning_rate": 0.05,
"feature_fraction": 0.9,
"bagging_fraction": 0.8,
"bagging_freq": 5,
}
# 3. Do non-distributed training.
model = lgb.train(
params,
train_set,
valid_sets=[eval_set],
valid_names=["eval"],
num_boost_round=100,
)
设置训练函数#
首先,更新您的训练代码以支持分布式训练。从将您的原生或scikit-learn Estimator LightGBM 训练代码包装到训练函数开始。
def train_func():
# Your native LightGBM training code here.
train_set = ...
lightgbm.train(...)
每个分布式训练工作节点都会执行此函数。
您也可以通过 Trainer 的 train_loop_config 将 train_func 的输入参数指定为字典。例如
def train_func(config):
label_column = config["label_column"]
num_boost_round = config["num_boost_round"]
...
config = {"label_column": "target", "num_boost_round": 100}
trainer = ray.train.lightgbm.LightGBMTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化开销。而是直接在 train_func 中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.lightgbm.LightGBMTrainer(train_func, train_loop_config=config, ...)
配置分布式训练参数#
要启用分布式 LightGBM 训练,请使用 ray.train.lightgbm.get_network_params() 将网络通信参数添加到您的训练配置中。此函数会自动配置必要的网络设置以进行工作进程通信。
def train_func():
...
params = {
# Your LightGBM training parameters here
...
+ "tree_learner": "data_parallel",
+ "pre_partition": True,
+ **ray.train.lightgbm.get_network_params(),
}
model = lightgbm.train(
params,
...
)
...
注意
确保设置 tree_learner 以启用分布式训练。有关更多详细信息,请参阅 LightGBM 文档。如果您使用 Ray Data 加载和分片您的数据集,如快速入门示例所示,您还应该设置 pre_partition=True。
报告指标和保存检查点#
要持久化您的检查点并监控训练进度,请将 ray.train.lightgbm.RayTrainReportCallback 实用回调添加到您的 Trainer 中。
import lightgbm
from ray.train.lightgbm import RayTrainReportCallback
def train_func():
...
bst = lightgbm.train(
...,
callbacks=[
RayTrainReportCallback(
metrics=["eval-multi_logloss"], frequency=1
)
],
)
...
将指标和检查点报告给 Ray Train,可以实现容错训练并与 Ray Tune 集成。
加载数据#
在运行分布式 LightGBM 训练时,每个工作进程应使用数据集的不同分片。
def get_train_dataset(world_rank: int) -> lightgbm.Dataset:
# Define logic to get the Dataset shard for this worker rank
...
def get_eval_dataset(world_rank: int) -> lightgbm.Dataset:
# Define logic to get the Dataset for each worker
...
def train_func():
rank = ray.train.get_world_rank()
train_set = get_train_dataset(rank)
eval_set = get_eval_dataset(rank)
...
一种常见的方法是对数据集进行预分片,然后为每个工作进程分配不同的文件进行读取。
对数据集进行预分片对工作进程数量的变化不够灵活,因为某些工作进程可能被分配比其他工作进程更多的数据。为了更灵活,Ray Data 提供了在运行时分片数据集的解决方案。
使用 Ray Data 对数据集进行分片#
Ray Data 是一个分布式数据处理库,可让您轻松地在多个工作进程之间对数据进行分片和分发。
首先,将您的全部数据集加载为 Ray Data Dataset。有关如何从不同来源加载和预处理数据的更多详细信息,请参阅Ray Data 快速入门。
train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")
在训练函数中,您可以使用 ray.train.get_dataset_shard() 访问此工作进程的数据集分片。将其转换为原生的 lightgbm.Dataset。
def get_dataset(dataset_name: str) -> lightgbm.Dataset:
shard = ray.train.get_dataset_shard(dataset_name)
df = shard.materialize().to_pandas()
X, y = df.drop("target", axis=1), df["target"]
return lightgbm.Dataset(X, label=y)
def train_func():
train_set = get_dataset("train")
eval_set = get_dataset("eval")
...
最后,将数据集传递给 Trainer。这将自动将数据集分片到工作进程。这些键必须与在训练函数中调用 get_dataset_shard 时使用的键匹配。
trainer = LightGBMTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()
有关更多详细信息,请参阅 数据加载和预处理。
配置缩放和 GPU#
在训练函数之外,创建一个 ScalingConfig 对象来配置
num_workers- 分布式训练工作节点的数量。use_gpu- 每个工作节点是否应使用 GPU(或 CPU)。resources_per_worker- 每个工作进程的 CPU 或 GPU 数量。
from ray.train import ScalingConfig
# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})
注意
当使用 Ray Data 和 Ray Train 时,请注意不要通过 resources_per_worker 参数请求您集群中的所有可用 CPU。Ray Data 需要 CPU 资源来并行执行数据预处理操作。如果所有 CPU 都分配给训练工作进程,Ray Data 操作可能会成为瓶颈,导致性能下降。一个好的做法是为 Ray Data 操作留出一些 CPU 资源。
例如,如果您的集群每个节点有 8 个 CPU,您可以为训练工作进程分配 6 个 CPU,为 Ray Data 留下 2 个 CPU。
# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})
为了使用 GPU,您需要在 ScalingConfig 对象中将 use_gpu 参数设置为 True。这将为每个工作进程请求并分配一个 GPU。
# 1 node with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)
使用 GPU 时,您还需要更新训练函数以使用分配的 GPU。这可以通过将 "device" 参数设置为 "gpu" 来完成。有关 LightGBM GPU 支持的更多详细信息,请参阅 LightGBM GPU 文档。
def train_func():
...
params = {
...,
+ "device": "gpu",
}
bst = lightgbm.train(
params,
...
)
配置持久存储#
创建一个 RunConfig 对象来指定结果(包括检查点和工件)将要保存的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或 NFS)对于单节点集群是*可选的*,但对于多节点集群是*必需的*。使用本地路径将在多节点集群的检查点过程中*引发错误*。
有关更多详细信息,请参阅 配置持久存储。
启动训练作业#
总而言之,您现在可以使用 LightGBMTrainer 启动分布式训练作业。
from ray.train.lightgbm import LightGBMTrainer
trainer = LightGBMTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
有关更多用法示例,请参阅 检查训练结果。
下一步#
在将 LightGBM 训练脚本转换为使用 Ray Train 后