实验跟踪#

大多数实验跟踪库都可以与 Ray Train 直接配合使用。本指南提供了如何设置代码的说明,以便您最喜欢的实验跟踪库能够用于 Ray Train 的分布式训练。指南末尾列出了常见错误,以帮助调试设置。

以下伪代码演示了如何在 Ray Train 中使用原生的实验跟踪库调用

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # Training code and native experiment tracking library calls go here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()

Ray Train 允许您通过在 train_func 函数内部定制跟踪逻辑来使用原生的实验跟踪库。这样,您只需进行少量修改即可将实验跟踪逻辑移植到 Ray Train。

入门#

让我们先看一些代码片段。

以下示例使用了 Weights & Biases (W&B) 和 MLflow,但它适用于其他框架。

import ray
from ray import train
import wandb

# Step 1
# This ensures that all ray worker processes have `WANDB_API_KEY` set.
ray.init(runtime_env={"env_vars": {"WANDB_API_KEY": "your_api_key"}})

def train_func():
    # Step 1 and 2
    if train.get_context().get_world_rank() == 0:
        wandb.init(
            name=...,
            project=...,
            # ...
        )

    # ...
    loss = optimize()
    metrics = {"loss": loss}

    # Step 3
    if train.get_context().get_world_rank() == 0:
        wandb.log(metrics)

    # ...

    # Step 4
    # Make sure that all loggings are uploaded to the W&B backend.
    if train.get_context().get_world_rank() == 0:
        wandb.finish()
from ray import train
import mlflow

# Run the following on the head node:
# $ databricks configure --token
# mv ~/.databrickscfg YOUR_SHARED_STORAGE_PATH
# This function assumes `databricks_config_file` is specified in the Trainer's `train_loop_config`.
def train_func(config):
    # Step 1 and 2
    os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
    mlflow.set_tracking_uri("databricks")
    mlflow.set_experiment_id(...)
    mlflow.start_run()

    # ...

    loss = optimize()

    metrics = {"loss": loss}
    # Only report the results from the first worker to MLflow
    to avoid duplication

    # Step 3
    if train.get_context().get_world_rank() == 0:
        mlflow.log_metrics(metrics)

提示

分布式训练和非分布式训练的一个主要区别在于,在分布式训练中,多个进程并行运行,在某些设置下它们会产生相同的结果。如果所有进程都向跟踪后端报告结果,您可能会得到重复的结果。为了解决这个问题,Ray Train 允许您仅对 rank 0 worker 应用日志记录逻辑,方法如下:ray.train.get_context().get_world_rank()

from ray import train
def train_func():
    ...
    if train.get_context().get_world_rank() == 0:
        # Add your logging logic only for rank0 worker.
    ...

train_func 中与实验跟踪后端的交互有 4 个逻辑步骤

  1. 设置与跟踪后端的连接

  2. 配置并启动一个运行

  3. 记录指标

  4. 完成运行

每个步骤的更多详细信息如下。

步骤 1:连接到您的跟踪后端#

首先,决定使用哪个跟踪后端:W&B、MLflow、TensorBoard、Comet 等。如果适用,请确保您已在每个训练 worker 上正确设置了凭据。

W&B 提供在线离线两种模式。

在线模式

对于在线模式,由于您将日志记录到 W&B 的跟踪服务,请确保您在 train_func 内部设置凭据。有关更多信息,请参阅 设置凭据

# This is equivalent to `os.environ["WANDB_API_KEY"] = "your_api_key"`
wandb.login(key="your_api_key")

离线模式

对于离线模式,由于您将日志记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。有关更多信息,请参阅 设置共享文件系统

os.environ["WANDB_MODE"] = "offline"
wandb.init(dir="some_shared_storage_path/wandb")

MLflow 提供本地远程(例如,到 Databricks 的 MLflow 服务)两种模式。

本地模式

对于本地模式,由于您将日志记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。有关更多信息,请参阅 设置共享文件系统

mlflow.start_run(tracking_uri="file:some_shared_storage_path/mlruns")

远程模式,由 Databricks 托管

确保所有节点都能访问 Databricks 配置文件。有关更多信息,请参阅 设置凭据

# The MLflow client looks for a Databricks config file
# at the location specified by `os.environ["DATABRICKS_CONFIG_FILE"]`.
os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
mlflow.set_tracking_uri("databricks")
mlflow.start_run()

设置凭据#

请参阅各个跟踪库的 API 文档,了解如何设置凭据。此步骤通常涉及设置环境变量或访问配置文件。

将环境变量凭据传递给训练 workers 的最简单方法是通过 运行时环境,您可以使用以下代码进行初始化

import ray
# This makes sure that training workers have the same env var set
ray.init(runtime_env={"env_vars": {"SOME_API_KEY": "your_api_key"}})

对于访问配置文件,请确保所有节点都能访问该文件。一种方法是设置共享存储。另一种方法是在每个节点中保存一份副本。

设置共享文件系统#

设置集群中所有节点都可以访问的网络文件系统。例如,AWS EFS 或 Google Cloud Filestore。

步骤 2:配置并启动运行#

此步骤通常涉及为运行选择一个标识符并将其与项目关联。有关语义,请参阅跟踪库的文档。

提示

在使用自动恢复进行容错训练时,请使用一致的 ID 来配置属于同一训练运行的所有逻辑跟踪运行。

步骤 3:记录指标#

您可以在 train_func 中定制如何记录参数、指标、模型或媒体内容,就像在非分布式训练脚本中一样。您还可以使用特定跟踪框架与特定训练框架的原生集成。例如,mlflow.pytorch.autolog()lightning.pytorch.loggers.MLFlowLogger 等。

步骤 4:完成运行#

此步骤确保所有日志都已同步到跟踪服务。根据不同跟踪库的实现,有时日志会先在本地缓存,然后才以异步方式同步到跟踪服务。完成运行可确保在训练 workers 退出时所有日志都已同步。

# https://docs.wandb.ai/ref/python/finish
wandb.finish()
# https://mlflow.org/docs/1.2.0/python_api/mlflow.html
mlflow.end_run()
# https://www.comet.com/docs/v2/api-and-sdk/python-sdk/reference/Experiment/#experimentend
Experiment.end()

示例#

以下是 PyTorch 和 PyTorch Lightning 的可运行示例。

PyTorch#

记录到 W&B
from filelock import FileLock
import os

import torch
import wandb

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.models import resnet18

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# Run the following script with the WANDB_API_KEY env var set.
assert os.environ.get("WANDB_API_KEY", None), "Please set WANDB_API_KEY env var."

# This makes sure that all workers have this env var set.
ray.init(
    runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)


def train_func(config):
    if ray.train.get_context().get_world_rank() == 0:
        wandb.init()

    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    model = ray.train.torch.prepare_model(model)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)

    # Data
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.28604,), (0.32025,))]
    )
    with FileLock("./data.lock"):
        train_data = datasets.FashionMNIST(
            root="./data", train=True, download=True, transform=transform
        )
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(1):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if ray.train.get_context().get_world_rank() == 0:
                wandb.log({"loss": loss, "epoch": epoch})

    if ray.train.get_context().get_world_rank() == 0:
        wandb.finish()


trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()
记录到基于文件的 MLflow
# Run the following script with the SHARED_STORAGE_PATH env var set.
# The MLflow offline logs are saved to SHARED_STORAGE_PATH/mlruns.

import mlflow
import os
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
from torchvision import datasets, transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader

assert os.environ.get(
    "SHARED_STORAGE_PATH", None
), "Please set SHARED_STORAGE_PATH env var."


# Assumes you are passing a `save_dir` in `config`
def train_func(config):
    save_dir = config["save_dir"]
    if ray.train.get_context().get_world_rank() == 0:
        mlflow.set_tracking_uri(f"file:{save_dir}")
        mlflow.set_experiment("my_experiment")
        mlflow.start_run()

    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    model = ray.train.torch.prepare_model(model)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)

    # Data
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.28604,), (0.32025,))]
    )
    with FileLock("./data.lock"):
        train_data = datasets.FashionMNIST(
            root="./data", train=True, download=True, transform=transform
        )
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(1):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if ray.train.get_context().get_world_rank() == 0:
                mlflow.log_metrics({"loss": loss.item(), "epoch": epoch})

    if ray.train.get_context().get_world_rank() == 0:
        mlflow.end_run()


trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
    },
    scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()

PyTorch Lightning#

在使用 Ray Train 的 TorchTrainer 时,您可以使用 PyTorch Lightning 中与 W&B、CometML、MLFlow 和 Tensorboard 的原生 Logger 集成。

以下示例将引导您完成整个过程。这里的代码是可运行的。

W&B
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
import wandb
from pytorch_lightning.loggers.wandb import WandbLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = WandbLogger(name="demo-run", project="demo-project")

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)
    if ray.train.get_context().get_world_rank() == 0:
        wandb.finish()


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "WANDB_API_KEY" in os.environ
), 'Please set WANDB_API_KEY="abcde" when running this script.'

# This ensures that all workers have this env var set.
ray.init(
    runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)
trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
)

trainer.fit()
MLflow
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.mlflow import MLFlowLogger

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):

    save_dir = config["save_dir"]
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = MLFlowLogger(
            experiment_name="demo-project",
            tracking_uri=f"file:{save_dir}",
        )

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."

trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
    },
    scaling_config=scaling_config,
)

trainer.fit()
Comet
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.comet import CometLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = CometLogger(api_key=os.environ["COMET_API_KEY"])

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "COMET_API_KEY" in os.environ
), 'Please do COMET_API_KEY="abcde" when running this script.'
# This makes sure that all workers have this env var set.
ray.init(runtime_env={"env_vars": {"COMET_API_KEY": os.environ["COMET_API_KEY"]}})
trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
)

trainer.fit()
TensorBoard
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.tensorboard import TensorBoardLogger

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):

    save_dir = config["save_dir"]
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = TensorBoardLogger(name="demo-run", save_dir=f"file:{save_dir}")

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."

trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "tensorboard")
    },
    scaling_config=scaling_config,
)

trainer.fit()

常见错误#

缺少凭据#

我已经调用了 `wandb login` CLI,但仍然收到

wandb: ERROR api_key not configured (no-tty). call wandb.login(key=[your_api_key]).

这可能是由于 wandb 凭据未在 worker 节点上正确设置。请确保您运行 wandb.login 或将 WANDB_API_KEY 传递给每个训练函数。有关更多详细信息,请参阅 设置凭据

缺少配置#

我已经运行了 `databricks configure`,但仍然收到

databricks_cli.utils.InvalidConfigurationError: You haven't configured the CLI yet!

这通常是由于运行 databricks configure 仅在头节点上生成 ~/.databrickscfg 文件所致。将此文件移动到共享位置或将其复制到每个节点。有关更多详细信息,请参阅 设置凭据