使用 XGBoost 进行分布式训练入门#
本教程将引导您完成将现有 XGBoost 脚本转换为使用 Ray Train 的过程。
了解如何
配置 训练函数 以报告指标和保存检查点。
为训练作业配置 缩放 以及 CPU 或 GPU 资源需求。
使用
XGBoostTrainer启动分布式训练作业。
快速入门#
作为参考,最终代码将如下所示
import ray.train
from ray.train.xgboost import XGBoostTrainer
def train_func():
# Your XGBoost training code here.
...
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = XGBoostTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func是在每个分布式训练工作节点上执行的 Python 代码。ScalingConfig定义了分布式训练工作节点的数量以及是否使用 GPU。XGBoostTrainer启动分布式训练作业。
比较使用 Ray Train 和不使用 Ray Train 的 XGBoost 训练脚本。
import xgboost
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")
def train_func():
# 2. Load your data shard as an `xgboost.DMatrix`.
# Get dataset shards for this worker
train_shard = ray.train.get_dataset_shard("train")
eval_shard = ray.train.get_dataset_shard("eval")
# Convert shards to pandas DataFrames
train_df = train_shard.materialize().to_pandas()
eval_df = eval_shard.materialize().to_pandas()
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)
# 3. Define your xgboost model training parameters.
params = {
"tree_method": "approx",
"objective": "reg:squarederror",
"eta": 1e-4,
"subsample": 0.5,
"max_depth": 2,
}
# 4. Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for your workers to communicate with each other.
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=10,
# Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
callbacks=[RayTrainReportCallback()],
)
# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})
# 6. Launch distributed training job.
trainer = XGBoostTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": train_dataset, "eval": eval_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
# 7. Load the trained model
import os
with result.checkpoint.as_directory() as checkpoint_dir:
model_path = os.path.join(checkpoint_dir, RayTrainReportCallback.CHECKPOINT_NAME)
model = xgboost.Booster()
model.load_model(model_path)
import pandas as pd
import xgboost
# 1. Load your data as an `xgboost.DMatrix`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)
# 2. Define your xgboost model training parameters.
params = {
"tree_method": "approx",
"objective": "reg:squarederror",
"eta": 1e-4,
"subsample": 0.5,
"max_depth": 2,
}
# 3. Do non-distributed training.
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=10,
)
设置训练函数#
首先,更新您的训练代码以支持分布式训练。开始时,将您的 原生 或 scikit-learn estimator XGBoost 训练代码包装在 训练函数 中。
def train_func():
# Your native XGBoost training code here.
dmatrix = ...
xgboost.train(...)
每个分布式训练工作节点都会执行此函数。
您也可以通过 Trainer 的 train_loop_config 将 train_func 的输入参数指定为字典。例如
def train_func(config):
label_column = config["label_column"]
num_boost_round = config["num_boost_round"]
...
config = {"label_column": "y", "num_boost_round": 10}
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化的开销。相反,直接在 train_func 中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)
Ray Train 会自动执行分布式 XGBoost 训练所需的 worker 通信设置。
报告指标和保存检查点#
要持久化您的检查点并监控训练进度,请将 ray.train.xgboost.RayTrainReportCallback 工具回调添加到您的 Trainer 中。
import xgboost
from ray.train.xgboost import RayTrainReportCallback
def train_func():
...
bst = xgboost.train(
...,
callbacks=[
RayTrainReportCallback(
metrics=["eval-logloss"], frequency=1
)
],
)
...
将指标和检查点报告给 Ray Train 可以实现 容错训练 以及与 Ray Tune 集成。
加载数据#
运行分布式 XGBoost 训练时,每个 worker 都应使用数据集的不同分片。
def get_train_dataset(world_rank: int) -> xgboost.DMatrix:
# Define logic to get the DMatrix shard for this worker rank
...
def get_eval_dataset(world_rank: int) -> xgboost.DMatrix:
# Define logic to get the DMatrix for each worker
...
def train_func():
rank = ray.train.get_world_rank()
dtrain = get_train_dataset(rank)
deval = get_eval_dataset(rank)
...
一种常见的方法是预先分片数据集,然后为每个 worker 分配不同的文件进行读取。
预分片数据集对于 worker 数量的变化不够灵活,因为有些 worker 可能分配了比其他 worker 更多的数据。为了更灵活,Ray Data 提供了在运行时分片数据集的解决方案。
使用 Ray Data 分片数据集#
Ray Data 是一个分布式数据处理库,允许您轻松地将数据分片并分发到多个 worker。
首先,将您的整个数据集加载为 Ray Data 数据集。有关如何从不同源加载和预处理数据的更多详细信息,请参阅 Ray Data 快速入门。
train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")
在训练函数中,您可以使用 ray.train.get_dataset_shard() 访问该 worker 的数据集分片。将其转换为原生的 xgboost.DMatrix。
def get_dmatrix(dataset_name: str) -> xgboost.DMatrix:
shard = ray.train.get_dataset_shard(dataset_name)
df = shard.materialize().to_pandas()
X, y = df.drop("target", axis=1), df["target"]
return xgboost.DMatrix(X, label=y)
def train_func():
dtrain = get_dmatrix("train")
deval = get_dmatrix("eval")
...
最后,将数据集传递给 Trainer。这将自动将数据集分片到 worker 之间。这些键必须与调用训练函数中的 get_dataset_shard 时使用的键匹配。
trainer = XGBoostTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()
有关更多详细信息,请参阅 数据加载和预处理。
配置缩放和 GPU#
在训练函数之外,创建一个 ScalingConfig 对象来配置
num_workers- 分布式训练工作节点的数量。use_gpu- 每个工作节点是否应使用 GPU(或 CPU)。resources_per_worker- 每个 worker 的 CPU 或 GPU 数量。
from ray.train import ScalingConfig
# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})
注意
当使用 Ray Data 和 Ray Train 时,请注意不要在 resources_per_worker 参数中请求集群中的所有可用 CPU。Ray Data 需要 CPU 资源来并行执行数据预处理操作。如果所有 CPU 都分配给训练 worker,Ray Data 操作可能会成为瓶颈,导致性能下降。一个好的做法是为 Ray Data 操作保留一部分 CPU 资源。
例如,如果您的集群每节点有 8 个 CPU,您可以为训练 worker 分配 6 个 CPU,为 Ray Data 留下 2 个 CPU。
# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})
为了使用 GPU,您需要在 ScalingConfig 对象中将 use_gpu 参数设置为 True。这将为每个 worker 请求并分配一个 GPU。
# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)
使用 GPU 时,您还需要更新训练函数以使用分配的 GPU。这可以通过将 "device" 参数设置为 "cuda" 来实现。有关 XGBoost GPU 支持的更多详细信息,请参阅 XGBoost GPU 文档。
def train_func():
...
params = {
...,
+ "device": "cuda",
}
bst = xgboost.train(
params,
...
)
配置持久存储#
创建一个 RunConfig 对象来指定结果(包括检查点和工件)将要保存的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或 NFS)对于单节点集群是*可选的*,但对于多节点集群是*必需的*。使用本地路径将在多节点集群的检查点过程中*引发错误*。
有关更多详细信息,请参阅 配置持久存储。
启动训练作业#
将所有这些整合在一起,您现在可以使用 XGBoostTrainer 启动分布式训练作业。
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
有关更多用法示例,请参阅 检查训练结果。
下一步#
将 XGBoost 训练脚本转换为使用 Ray Train 后