Monitoring and Logging Metrics#
Ray Train 提供了一个 API,允许通过调用 ray.train.report(metrics, checkpoint) 将指标附加到训练函数的 检查点。结果将从分布式工作节点收集,并传递给 Ray Train 驱动程序进程进行记录。
报告的主要用例包括
只有 rank 0 工作节点报告的结果会附加到检查点。但是,为了确保一致性,train.report() 会充当屏障,并且必须在每个工作节点上调用。要聚合来自多个工作节点的结果,请参阅 如何从不同工作节点获取和聚合结果?。
How to obtain and aggregate results from different workers?#
在实际应用中,您可能希望计算准确率和损失之外的优化指标:召回率、精确率、Fbeta 值等。您还可能希望收集来自多个工作节点的指标。虽然 Ray Train 目前仅报告 rank 0 工作节点的指标,但您可以使用第三方库或机器学习框架的分布式原语来报告来自多个工作节点的指标。
Ray Train 原生支持 TorchMetrics,它为分布式、可扩展的 PyTorch 模型提供了一系列机器学习指标。
以下是一个报告来自所有工作节点的聚合 R2 分数以及平均训练和验证损失的示例。
# First, pip install torchmetrics
# This code is tested with torchmetrics==0.7.3 and torch==1.12.1
import os
import tempfile
import ray.train.torch
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
import torchmetrics
from torch.optim import Adam
import numpy as np
def train_func(config):
n = 100
# create a toy dataset
X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
X_valid = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
Y_valid = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
# toy neural network : 1-layer
# wrap the model in DDP
model = ray.train.torch.prepare_model(nn.Linear(4, 1))
criterion = nn.MSELoss()
mape = torchmetrics.MeanAbsolutePercentageError()
# for averaging loss
mean_valid_loss = torchmetrics.MeanMetric()
optimizer = Adam(model.parameters(), lr=3e-4)
for epoch in range(config["num_epochs"]):
model.train()
y = model.forward(X)
# compute loss
loss = criterion(y, Y)
# back-propagate loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
# evaluate
model.eval()
with torch.no_grad():
pred = model(X_valid)
valid_loss = criterion(pred, Y_valid)
# save loss in aggregator
mean_valid_loss(valid_loss)
mape(pred, Y_valid)
# collect all metrics
# use .item() to obtain a value that can be reported
valid_loss = valid_loss.item()
mape_collected = mape.compute().item()
mean_valid_loss_collected = mean_valid_loss.compute().item()
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
)
train.report(
{
"mape_collected": mape_collected,
"valid_loss": valid_loss,
"mean_valid_loss_collected": mean_valid_loss_collected,
},
checkpoint=train.Checkpoint.from_directory(temp_checkpoint_dir),
)
# reset for next epoch
mape.reset()
mean_valid_loss.reset()
trainer = TorchTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.metrics["valid_loss"], result.metrics["mean_valid_loss_collected"])
# 0.5109779238700867 0.5512474775314331
(Deprecated) Reporting free-floating metrics#
通过每个工作节点的 ray.train.report(metrics, checkpoint=None) 报告指标会将指标写入 Ray Tune 日志文件(progress.csv、result.json),并且可以通过 trainer.fit() 返回的 Result 上的 Result.metrics_dataframe 访问。
从 Ray 2.43 开始,此行为已被弃用,并且在 Ray Train V2 中将不再支持,Ray Train V2 是 Ray Train 的实现和部分 API 的大修。
Ray Train V2 仅保留了一组必要的实验跟踪功能,用于容错,因此它不支持报告未附加到检查点的自由浮动指标。对于指标跟踪的建议是直接从工作节点将指标报告给 MLFlow 和 WandB 等实验跟踪工具。有关示例,请参阅 Experiment Tracking。
在 Ray Train V2 中,仅报告所有工作节点的指标将是一个空操作。但是,仍然可以访问所有工作节点报告的结果来实现自定义指标处理逻辑。
import os
assert os.environ["RAY_TRAIN_V2_ENABLED"] == "1"
from typing import Any, Dict, List, Optional
import ray.train
import ray.train.torch
def train_fn_per_worker(config):
# Free-floating metrics can be accessed from the callback below.
ray.train.report({"rank": ray.train.get_context().get_world_rank()})
class CustomMetricsCallback(ray.train.UserCallback):
def after_report(
self,
run_context,
metrics: List[Dict[str, Any]],
checkpoint: Optional[ray.train.Checkpoint],
):
rank_0_metrics = metrics[0]
print(rank_0_metrics)
# Ex: Write metrics to a file...
trainer = ray.train.torch.TorchTrainer(
train_fn_per_worker,
scaling_config=ray.train.ScalingConfig(num_workers=2),
run_config=ray.train.RunConfig(callbacks=[CustomMetricsCallback()]),
)
trainer.fit()
要使用依赖于工作节点报告的自由浮动指标的 Ray Tune Callbacks,请 将 Ray Train 作为单个 Ray Tune 试运行。
有关更多信息,请参阅以下资源:
Train V2 REP: Train V2 API 更改的技术细节
Train V2 迁移指南:Train V2 的完整迁移指南。