监控和日志记录指标#

Ray Train 提供了一个 API,用于通过调用 检查点 中的训练函数来附加指标,方法是调用 ray.train.report(metrics, checkpoint)。结果将从分布式 worker 收集,并传递给 Ray Train driver 进程进行记录。

报告的主要用例是在每个训练 epoch 结束时记录指标(准确率、损失等)。有关用法示例,请参阅训练期间保存检查点

只有 rank 0 worker 报告的结果会被附加到检查点。然而,为了确保一致性,train.report() 充当了一个屏障,必须在每个 worker 上调用。要聚合来自多个 worker 的结果,请参阅如何获取和聚合来自不同 worker 的结果?

如何获取和聚合来自不同 worker 的结果?#

在实际应用中,您可能希望计算除准确率和损失之外的优化指标:召回率、精确率、Fbeta 等。您可能还希望收集来自多个 worker 的指标。虽然 Ray Train 当前只报告 rank 0 worker 的指标,但您可以使用第三方库或机器学习框架的分布式原语来报告来自多个 worker 的指标。

Ray Train 原生支持 TorchMetrics,它为分布式、可扩展的 PyTorch 模型提供了一系列机器学习指标。

这里有一个示例,展示如何报告聚合的 R2 分数以及所有 worker 的平均训练和验证损失。


# First, pip install torchmetrics
# This code is tested with torchmetrics==0.7.3 and torch==1.12.1

import os
import tempfile

import ray.train.torch
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

import torch
import torch.nn as nn
import torchmetrics
from torch.optim import Adam
import numpy as np


def train_func(config):
    n = 100
    # create a toy dataset
    X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
    X_valid = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
    Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
    Y_valid = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
    # toy neural network : 1-layer
    # wrap the model in DDP
    model = ray.train.torch.prepare_model(nn.Linear(4, 1))
    criterion = nn.MSELoss()

    mape = torchmetrics.MeanAbsolutePercentageError()
    # for averaging loss
    mean_valid_loss = torchmetrics.MeanMetric()

    optimizer = Adam(model.parameters(), lr=3e-4)
    for epoch in range(config["num_epochs"]):
        model.train()
        y = model.forward(X)

        # compute loss
        loss = criterion(y, Y)

        # back-propagate loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # evaluate
        model.eval()
        with torch.no_grad():
            pred = model(X_valid)
            valid_loss = criterion(pred, Y_valid)
            # save loss in aggregator
            mean_valid_loss(valid_loss)
            mape(pred, Y_valid)

        # collect all metrics
        # use .item() to obtain a value that can be reported
        valid_loss = valid_loss.item()
        mape_collected = mape.compute().item()
        mean_valid_loss_collected = mean_valid_loss.compute().item()

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
            )

            train.report(
                {
                    "mape_collected": mape_collected,
                    "valid_loss": valid_loss,
                    "mean_valid_loss_collected": mean_valid_loss_collected,
                },
                checkpoint=train.Checkpoint.from_directory(temp_checkpoint_dir),
            )

        # reset for next epoch
        mape.reset()
        mean_valid_loss.reset()


trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.metrics["valid_loss"], result.metrics["mean_valid_loss_collected"])
# 0.5109779238700867 0.5512474775314331

(已弃用) 报告独立指标#

从每个 worker 调用 ray.train.report(metrics, checkpoint=None) 报告指标会将指标写入 Ray Tune 日志文件 (progress.csv, result.json),并且可以通过 trainer.fit() 返回的 Result 上的 Result.metrics_dataframe 访问。

自 Ray 2.43 起,此行为已弃用,并且在 Ray Train V2 中将不再支持,Ray Train V2 是对 Ray Train 实现和部分 API 的全面改革。

Ray Train V2 只保留了一部分对容错必要的实验追踪功能,因此不支持报告未附加到检查点的独立指标。建议的指标追踪方法是直接从 worker 向 MLFlow 和 WandB 等实验追踪工具报告指标。有关示例,请参阅实验追踪

在 Ray Train V2 中,仅报告所有 worker 的指标是空操作 (no-op)。但是,仍然可以访问所有 worker 报告的结果来实现自定义指标处理逻辑。

import os

assert os.environ["RAY_TRAIN_V2_ENABLED"] == "1"

from typing import Any, Dict, List, Optional

import ray.train
import ray.train.torch


def train_fn_per_worker(config):
    # Free-floating metrics can be accessed from the callback below.
    ray.train.report({"rank": ray.train.get_context().get_world_rank()})


class CustomMetricsCallback(ray.train.UserCallback):
    def after_report(
        self,
        run_context,
        metrics: List[Dict[str, Any]],
        checkpoint: Optional[ray.train.Checkpoint],
    ):
        rank_0_metrics = metrics[0]
        print(rank_0_metrics)
        # Ex: Write metrics to a file...


trainer = ray.train.torch.TorchTrainer(
    train_fn_per_worker,
    scaling_config=ray.train.ScalingConfig(num_workers=2),
    run_config=ray.train.RunConfig(callbacks=[CustomMetricsCallback()]),
)
trainer.fit()

要使用依赖于 worker 报告的独立指标的 Ray Tune Callbacks,请将 Ray Train 作为单个 Ray Tune trial 运行。

有关更多信息,请参阅以下资源