Monitoring and Logging Metrics#

Ray Train 提供了一个 API,允许通过调用 ray.train.report(metrics, checkpoint) 将指标附加到训练函数的 检查点。结果将从分布式工作节点收集,并传递给 Ray Train 驱动程序进程进行记录。

报告的主要用例包括

  • 在每个训练 epoch 结束时报告指标(准确率、损失等)。有关用法示例,请参阅 训练期间保存检查点

  • 使用用户定义的验证函数在验证集上验证检查点。有关用法示例,请参阅 异步验证检查点

只有 rank 0 工作节点报告的结果会附加到检查点。但是,为了确保一致性,train.report() 会充当屏障,并且必须在每个工作节点上调用。要聚合来自多个工作节点的结果,请参阅 如何从不同工作节点获取和聚合结果?

How to obtain and aggregate results from different workers?#

在实际应用中,您可能希望计算准确率和损失之外的优化指标:召回率、精确率、Fbeta 值等。您还可能希望收集来自多个工作节点的指标。虽然 Ray Train 目前仅报告 rank 0 工作节点的指标,但您可以使用第三方库或机器学习框架的分布式原语来报告来自多个工作节点的指标。

Ray Train 原生支持 TorchMetrics,它为分布式、可扩展的 PyTorch 模型提供了一系列机器学习指标。

以下是一个报告来自所有工作节点的聚合 R2 分数以及平均训练和验证损失的示例。


# First, pip install torchmetrics
# This code is tested with torchmetrics==0.7.3 and torch==1.12.1

import os
import tempfile

import ray.train.torch
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

import torch
import torch.nn as nn
import torchmetrics
from torch.optim import Adam
import numpy as np


def train_func(config):
    n = 100
    # create a toy dataset
    X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
    X_valid = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
    Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
    Y_valid = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
    # toy neural network : 1-layer
    # wrap the model in DDP
    model = ray.train.torch.prepare_model(nn.Linear(4, 1))
    criterion = nn.MSELoss()

    mape = torchmetrics.MeanAbsolutePercentageError()
    # for averaging loss
    mean_valid_loss = torchmetrics.MeanMetric()

    optimizer = Adam(model.parameters(), lr=3e-4)
    for epoch in range(config["num_epochs"]):
        model.train()
        y = model.forward(X)

        # compute loss
        loss = criterion(y, Y)

        # back-propagate loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # evaluate
        model.eval()
        with torch.no_grad():
            pred = model(X_valid)
            valid_loss = criterion(pred, Y_valid)
            # save loss in aggregator
            mean_valid_loss(valid_loss)
            mape(pred, Y_valid)

        # collect all metrics
        # use .item() to obtain a value that can be reported
        valid_loss = valid_loss.item()
        mape_collected = mape.compute().item()
        mean_valid_loss_collected = mean_valid_loss.compute().item()

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
            )

            train.report(
                {
                    "mape_collected": mape_collected,
                    "valid_loss": valid_loss,
                    "mean_valid_loss_collected": mean_valid_loss_collected,
                },
                checkpoint=train.Checkpoint.from_directory(temp_checkpoint_dir),
            )

        # reset for next epoch
        mape.reset()
        mean_valid_loss.reset()


trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.metrics["valid_loss"], result.metrics["mean_valid_loss_collected"])
# 0.5109779238700867 0.5512474775314331

(Deprecated) Reporting free-floating metrics#

通过每个工作节点的 ray.train.report(metrics, checkpoint=None) 报告指标会将指标写入 Ray Tune 日志文件(progress.csvresult.json),并且可以通过 trainer.fit() 返回的 Result 上的 Result.metrics_dataframe 访问。

从 Ray 2.43 开始,此行为已被弃用,并且在 Ray Train V2 中将不再支持,Ray Train V2 是 Ray Train 的实现和部分 API 的大修。

Ray Train V2 仅保留了一组必要的实验跟踪功能,用于容错,因此它不支持报告未附加到检查点的自由浮动指标。对于指标跟踪的建议是直接从工作节点将指标报告给 MLFlow 和 WandB 等实验跟踪工具。有关示例,请参阅 Experiment Tracking

在 Ray Train V2 中,仅报告所有工作节点的指标将是一个空操作。但是,仍然可以访问所有工作节点报告的结果来实现自定义指标处理逻辑。

import os

assert os.environ["RAY_TRAIN_V2_ENABLED"] == "1"

from typing import Any, Dict, List, Optional

import ray.train
import ray.train.torch


def train_fn_per_worker(config):
    # Free-floating metrics can be accessed from the callback below.
    ray.train.report({"rank": ray.train.get_context().get_world_rank()})


class CustomMetricsCallback(ray.train.UserCallback):
    def after_report(
        self,
        run_context,
        metrics: List[Dict[str, Any]],
        checkpoint: Optional[ray.train.Checkpoint],
    ):
        rank_0_metrics = metrics[0]
        print(rank_0_metrics)
        # Ex: Write metrics to a file...


trainer = ray.train.torch.TorchTrainer(
    train_fn_per_worker,
    scaling_config=ray.train.ScalingConfig(num_workers=2),
    run_config=ray.train.RunConfig(callbacks=[CustomMetricsCallback()]),
)
trainer.fit()

要使用依赖于工作节点报告的自由浮动指标的 Ray Tune Callbacks,请 将 Ray Train 作为单个 Ray Tune 试运行

有关更多信息,请参阅以下资源: