使用 XGBoost 开始分布式训练#
本教程将引导您完成将现有 XGBoost 脚本转换为使用 Ray Train 的过程。
了解如何
配置训练函数以报告指标和保存检查点。
配置训练作业的扩缩容以及 CPU 或 GPU 资源需求。
使用
XGBoostTrainer
启动分布式训练作业。
快速入门#
作为参考,最终代码看起来会像这样
import ray.train
from ray.train.xgboost import XGBoostTrainer
def train_func():
# Your XGBoost training code here.
...
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = XGBoostTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func
是在每个分布式训练 Worker 上执行的 Python 代码。ScalingConfig
定义了分布式训练 Worker 的数量以及是否使用 GPU。XGBoostTrainer
启动分布式训练作业。
比较使用 Ray Train 和不使用 Ray Train 的 XGBoost 训练脚本。
import xgboost
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")
def train_func():
# 2. Load your data shard as an `xgboost.DMatrix`.
# Get dataset shards for this worker
train_shard = ray.train.get_dataset_shard("train")
eval_shard = ray.train.get_dataset_shard("eval")
# Convert shards to pandas DataFrames
train_df = train_shard.materialize().to_pandas()
eval_df = eval_shard.materialize().to_pandas()
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)
# 3. Define your xgboost model training parameters.
params = {
"tree_method": "approx",
"objective": "reg:squarederror",
"eta": 1e-4,
"subsample": 0.5,
"max_depth": 2,
}
# 4. Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for your workers to communicate with each other.
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=10,
# Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
callbacks=[RayTrainReportCallback()],
)
# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})
# 6. Launch distributed training job.
trainer = XGBoostTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": train_dataset, "eval": eval_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
# 7. Load the trained model
import os
with result.checkpoint.as_directory() as checkpoint_dir:
model_path = os.path.join(checkpoint_dir, RayTrainReportCallback.CHECKPOINT_NAME)
model = xgboost.Booster()
model.load_model(model_path)
import pandas as pd
import xgboost
# 1. Load your data as an `xgboost.DMatrix`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")
train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)
# 2. Define your xgboost model training parameters.
params = {
"tree_method": "approx",
"objective": "reg:squarederror",
"eta": 1e-4,
"subsample": 0.5,
"max_depth": 2,
}
# 3. Do non-distributed training.
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=10,
)
设置训练函数#
首先,更新您的训练代码以支持分布式训练。将您的 native 或 scikit-learn estimator XGBoost 训练代码封装在一个 训练函数中。
def train_func():
# Your native XGBoost training code here.
dmatrix = ...
xgboost.train(...)
每个分布式训练 Worker 都会执行此函数。
您还可以通过 Trainer 的 train_loop_config
将 train_func
的输入参数指定为字典。例如
def train_func(config):
label_column = config["label_column"]
num_boost_round = config["num_boost_round"]
...
config = {"label_column": "y", "num_boost_round": 10}
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config
传递大型数据对象,以减少序列化和反序列化开销。相反,直接在 train_func
中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)
Ray Train 会自动执行分布式 XGBoost 训练所需的 Worker 通信设置。
报告指标和保存检查点#
为了持久化检查点和监控训练进度,请为您的 Trainer 添加一个 ray.train.xgboost.RayTrainReportCallback
工具回调。
import xgboost
from ray.train.xgboost import RayTrainReportCallback
def train_func():
...
bst = xgboost.train(
...,
callbacks=[
RayTrainReportCallback(
metrics=["eval-logloss"], frequency=1
)
],
)
...
向 Ray Train 报告指标和检查点可以启用容错训练,并与 Ray Tune 集成。
加载数据#
运行分布式 XGBoost 训练时,每个 Worker 应使用数据集的不同分片。
def get_train_dataset(world_rank: int) -> xgboost.DMatrix:
# Define logic to get the DMatrix shard for this worker rank
...
def get_eval_dataset(world_rank: int) -> xgboost.DMatrix:
# Define logic to get the DMatrix for each worker
...
def train_func():
rank = ray.train.get_world_rank()
dtrain = get_train_dataset(rank)
deval = get_eval_dataset(rank)
...
一种常见方法是预先对数据集进行分片,然后为每个 Worker 分配一组不同的文件来读取。
预先分片数据集对于 Worker 数量的变化不够灵活,因为一些 Worker 可能会被分配比其他 Worker 更多的数据。为了获得更高的灵活性,Ray Data 提供了一种在运行时对数据集进行分片的解决方案。
使用 Ray Data 对数据集进行分片#
Ray Data 是一个分布式数据处理库,可让您轻松地将数据分片并分布到多个 Worker 上。
首先,将您的整个数据集作为 Ray Data Dataset 加载。有关如何从不同来源加载和预处理数据的更多详细信息,请参阅Ray Data 快速入门。
train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")
在训练函数中,您可以使用 ray.train.get_dataset_shard()
访问此 Worker 的数据集分片。将其转换为 native xgboost.DMatrix。
def get_dmatrix(dataset_name: str) -> xgboost.DMatrix:
shard = ray.train.get_dataset_shard(dataset_name)
df = shard.materialize().to_pandas()
X, y = df.drop("target", axis=1), df["target"]
return xgboost.DMatrix(X, label=y)
def train_func():
dtrain = get_dmatrix("train")
deval = get_dmatrix("eval")
...
最后,将数据集传递给 Trainer。这将自动在 Worker 之间分片数据集。这些键必须与在训练函数中调用 get_dataset_shard
时使用的键匹配。
trainer = XGBoostTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()
更多详细信息,请参阅数据加载和预处理。
配置规模和 GPU#
在训练函数外部,创建一个 ScalingConfig
对象来配置
num_workers
- 分布式训练 Worker 进程的数量。use_gpu
- 每个 Worker 是否应使用 GPU(或 CPU)。resources_per_worker
- 每个 Worker 的 CPU 或 GPU 数量。
from ray.train import ScalingConfig
# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})
注意
将 Ray Data 与 Ray Train 一起使用时,请注意不要使用 resources_per_worker
参数请求集群中所有可用的 CPU。Ray Data 需要 CPU 资源来并行执行数据预处理操作。如果所有 CPU 都分配给训练 Worker,Ray Data 操作可能会成为瓶颈,导致性能下降。一个好的实践是保留一部分 CPU 资源供 Ray Data 操作使用。
例如,如果您的集群每个节点有 8 个 CPU,您可以分配 6 个 CPU 给训练 Worker,并保留 2 个 CPU 给 Ray Data
# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})
要使用 GPU,您需要在 ScalingConfig
对象中将 use_gpu
参数设置为 True
。这将为每个 Worker 请求并分配一个 GPU。
# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)
使用 GPU 时,您还需要更新训练函数以使用分配的 GPU。这可以通过将 "device"
参数设置为 "cuda"
来完成。有关 XGBoost GPU 支持的更多详细信息,请参阅 XGBoost GPU 文档。
def train_func():
...
params = {
...,
+ "device": "cuda",
}
bst = xgboost.train(
params,
...
)
配置持久存储#
创建一个 RunConfig
对象,指定结果(包括检查点和 artifacts)的保存路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
对于单节点集群,指定共享存储位置(例如云存储或 NFS)是可选的,但对于多节点集群是必需的。对于多节点集群,使用本地路径将在检查点过程中引发错误。
更多详细信息,请参阅配置持久存储。
启动训练作业#
将所有这些结合起来,您现在可以使用 XGBoostTrainer
启动分布式训练作业了。
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,将返回一个 Result
对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
更多使用示例,请参阅检查训练结果。
后续步骤#
将 XGBoost 训练脚本转换为使用 Ray Train 后