实验跟踪#
大多数实验跟踪库都可以与 Ray Train 配合使用。本指南提供了如何设置代码的说明,以便您喜欢的实验跟踪库能够与 Ray Train 一起用于分布式训练。指南的最后包含常见错误,以帮助调试设置。
以下伪代码演示了如何在 Ray Train 中使用本机实验跟踪库调用
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Training code and native experiment tracking library calls go here.
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
Ray Train 允许您通过自定义 train_func 函数中的跟踪逻辑来使用本机实验跟踪库。这样,您就可以以最小的更改将实验跟踪逻辑移植到 Ray Train。
入门#
让我们从一些代码片段开始。
以下示例使用了 Weights & Biases (W&B) 和 MLflow,但可适应其他框架。
import ray
from ray import train
import wandb
# Step 1
# This ensures that all ray worker processes have `WANDB_API_KEY` set.
ray.init(runtime_env={"env_vars": {"WANDB_API_KEY": "your_api_key"}})
def train_func():
# Step 1 and 2
if train.get_context().get_world_rank() == 0:
wandb.init(
name=...,
project=...,
# ...
)
# ...
loss = optimize()
metrics = {"loss": loss}
# Step 3
if train.get_context().get_world_rank() == 0:
# Only report the results from the rank 0 worker to W&B to avoid duplication.
wandb.log(metrics)
# ...
# Step 4
# Make sure that all loggings are uploaded to the W&B backend.
if train.get_context().get_world_rank() == 0:
wandb.finish()
from ray import train
import mlflow
# Run the following on the head node:
# $ databricks configure --token
# mv ~/.databrickscfg YOUR_SHARED_STORAGE_PATH
# This function assumes `databricks_config_file` is specified in the Trainer's `train_loop_config`.
def train_func(config):
# Step 1 and 2
os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment_id(...)
mlflow.start_run()
# ...
loss = optimize()
metrics = {"loss": loss}
# Step 3
if train.get_context().get_world_rank() == 0:
# Only report the results from the rank 0 worker to MLflow to avoid duplication.
mlflow.log_metrics(metrics)
提示
分布式训练和非分布式训练的一个主要区别在于,在分布式训练中,多个进程并行运行,在某些设置下它们具有相同的结果。如果所有进程都向跟踪后端报告结果,您可能会得到重复的结果。为了解决这个问题,Ray Train 允许您仅将日志记录逻辑应用于 0 号工作进程,方法如下: ray.train.get_context().get_world_rank()。
from ray import train
def train_func():
...
if train.get_context().get_world_rank() == 0:
# Add your logging logic only for rank0 worker.
...
在 train_func 中与实验跟踪后端进行交互包含 4 个逻辑步骤
设置与跟踪后端的连接
配置和启动一个运行
记录指标
完成运行
每个步骤的更多详细信息如下。
步骤 1:连接到您的跟踪后端#
首先,决定使用哪个跟踪后端:W&B、MLflow、TensorBoard、Comet 等。如果适用,请确保在每个训练工作进程上正确设置凭据。
W&B 提供在线和离线模式。
在线
对于在线模式,由于您将日志记录到 W&B 的跟踪服务,请确保在 train_func 中设置凭据。有关更多信息,请参阅设置凭据。
# This is equivalent to `os.environ["WANDB_API_KEY"] = "your_api_key"`
wandb.login(key="your_api_key")
离线
对于离线模式,由于您将日志记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。有关更多信息,请参阅设置共享文件系统。
os.environ["WANDB_MODE"] = "offline"
wandb.init(dir="some_shared_storage_path/wandb")
MLflow 提供本地和远程(例如,到 Databricks 的 MLflow 服务)模式。
本地
对于本地模式,由于您将日志记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。有关更多信息,请参阅设置共享文件系统。
mlflow.set_tracking_uri(uri="file://some_shared_storage_path/mlruns")
mlflow.start_run()
远程,由 Databricks 托管
确保所有节点都可以访问 Databricks 配置文件。有关更多信息,请参阅设置凭据。
# The MLflow client looks for a Databricks config file
# at the location specified by `os.environ["DATABRICKS_CONFIG_FILE"]`.
os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
mlflow.set_tracking_uri("databricks")
mlflow.start_run()
设置凭据#
请参阅每个跟踪库的 API 文档,了解如何设置凭据。此步骤通常涉及设置环境变量或访问配置文件。
将环境变量凭据传递到训练工作进程的最简单方法是通过 运行时环境,您可以使用以下代码进行初始化
import ray
# This makes sure that training workers have the same env var set
ray.init(runtime_env={"env_vars": {"SOME_API_KEY": "your_api_key"}})
要访问配置文件,请确保该配置文件可被所有节点访问。一种方法是设置共享存储。另一种方法是在每个节点上保存一份副本。
步骤 2:配置并启动运行#
此步骤通常涉及为运行选择一个标识符,并将其与一个项目关联。有关语义,请参阅跟踪库的文档。
提示
当使用自动恢复进行容错训练时,请使用一致的 ID 来配置所有逻辑上属于同一训练运行的跟踪运行。
步骤 3:记录指标#
您可以像在非分布式训练脚本中一样,自定义如何在 train_func 中记录参数、指标、模型或媒体内容。您还可以使用特定跟踪框架与特定训练框架的本机集成。例如,mlflow.pytorch.autolog()、lightning.pytorch.loggers.MLFlowLogger 等。
步骤 4:完成运行#
此步骤确保所有日志都已同步到跟踪服务。根据各种跟踪库的实现,有时日志会先本地缓存,然后异步同步到跟踪服务。完成运行可确保在训练工作进程退出时所有日志都已同步。
# https://docs.wandb.ai/ref/python/finish
wandb.finish()
# https://mlflow.org.cn/docs/1.2.0/python_api/mlflow.html
mlflow.end_run()
# https://www.comet.com/docs/v2/api-and-sdk/python-sdk/reference/Experiment/#experimentend
Experiment.end()
示例#
以下是 PyTorch 和 PyTorch Lightning 的可运行示例。
PyTorch#
记录到 W&B
from filelock import FileLock
import os
import torch
import wandb
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.models import resnet18
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Run the following script with the WANDB_API_KEY env var set.
assert os.environ.get("WANDB_API_KEY", None), "Please set WANDB_API_KEY env var."
# This makes sure that all workers have this env var set.
ray.init(
runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)
def train_func(config):
if ray.train.get_context().get_world_rank() == 0:
wandb.init()
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model = ray.train.torch.prepare_model(model)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)
# Data
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.28604,), (0.32025,))]
)
with FileLock("./data.lock"):
train_data = datasets.FashionMNIST(
root="./data", train=True, download=True, transform=transform
)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
train_loader = ray.train.torch.prepare_data_loader(train_loader)
# Training
for epoch in range(1):
if ray.train.get_context().get_world_size() > 1:
train_loader.sampler.set_epoch(epoch)
for images, labels in train_loader:
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if ray.train.get_context().get_world_rank() == 0:
wandb.log({"loss": loss, "epoch": epoch})
if ray.train.get_context().get_world_rank() == 0:
wandb.finish()
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()
记录到基于文件的 MLflow
# Run the following script with the SHARED_STORAGE_PATH env var set.
# The MLflow offline logs are saved to SHARED_STORAGE_PATH/mlruns.
import mlflow
import os
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
from torchvision import datasets, transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader
assert os.environ.get(
"SHARED_STORAGE_PATH", None
), "Please set SHARED_STORAGE_PATH env var."
# Assumes you are passing a `save_dir` in `config`
def train_func(config):
save_dir = config["save_dir"]
if ray.train.get_context().get_world_rank() == 0:
mlflow.set_tracking_uri(f"file:{save_dir}")
mlflow.set_experiment("my_experiment")
mlflow.start_run()
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model = ray.train.torch.prepare_model(model)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)
# Data
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.28604,), (0.32025,))]
)
with FileLock("./data.lock"):
train_data = datasets.FashionMNIST(
root="./data", train=True, download=True, transform=transform
)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
train_loader = ray.train.torch.prepare_data_loader(train_loader)
# Training
for epoch in range(1):
if ray.train.get_context().get_world_size() > 1:
train_loader.sampler.set_epoch(epoch)
for images, labels in train_loader:
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if ray.train.get_context().get_world_rank() == 0:
mlflow.log_metrics({"loss": loss.item(), "epoch": epoch})
if ray.train.get_context().get_world_rank() == 0:
mlflow.end_run()
trainer = TorchTrainer(
train_func,
train_loop_config={
"save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
},
scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()
PyTorch Lightning#
您可以在使用 Ray Train 的 TorchTrainer 时,使用 PyTorch Lightning 中与 W&B、CometML、MLFlow 和 Tensorboard 的本机 Logger 集成。
以下示例将指导您完成此过程。这里的代码是可运行的。
W&B
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
# Create dummy data
X = torch.randn(128, 3) # 128 samples, 3 features
y = torch.randint(0, 2, (128,)) # 128 binary labels
# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)
# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Define a dummy model
class DummyModel(pl.LightningModule):
def __init__(self):
super().__init__()
self.layer = torch.nn.Linear(3, 1)
def forward(self, x):
return self.layer(x)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())
# The metrics below will be reported to Loggers
self.log("train_loss", loss)
self.log_dict({
"metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
})
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
import wandb
from pytorch_lightning.loggers.wandb import WandbLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
logger = None
if ray.train.get_context().get_world_rank() == 0:
logger = WandbLogger(name="demo-run", project="demo-project")
ptl_trainer = pl.Trainer(
max_epochs=5,
accelerator="cpu",
logger=logger,
log_every_n_steps=1,
)
model = DummyModel()
ptl_trainer.fit(model, train_dataloaders=dataloader)
if ray.train.get_context().get_world_rank() == 0:
wandb.finish()
scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
assert (
"WANDB_API_KEY" in os.environ
), 'Please set WANDB_API_KEY="abcde" when running this script.'
# This ensures that all workers have this env var set.
ray.init(
runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)
trainer = TorchTrainer(
train_func,
scaling_config=scaling_config,
)
trainer.fit()
MLflow
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
# Create dummy data
X = torch.randn(128, 3) # 128 samples, 3 features
y = torch.randint(0, 2, (128,)) # 128 binary labels
# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)
# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Define a dummy model
class DummyModel(pl.LightningModule):
def __init__(self):
super().__init__()
self.layer = torch.nn.Linear(3, 1)
def forward(self, x):
return self.layer(x)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())
# The metrics below will be reported to Loggers
self.log("train_loss", loss)
self.log_dict({
"metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
})
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.mlflow import MLFlowLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
save_dir = config["save_dir"]
logger = None
if ray.train.get_context().get_world_rank() == 0:
logger = MLFlowLogger(
experiment_name="demo-project",
tracking_uri=f"file:{save_dir}",
)
ptl_trainer = pl.Trainer(
max_epochs=5,
accelerator="cpu",
logger=logger,
log_every_n_steps=1,
)
model = DummyModel()
ptl_trainer.fit(model, train_dataloaders=dataloader)
scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
assert (
"SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."
trainer = TorchTrainer(
train_func,
train_loop_config={
"save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
},
scaling_config=scaling_config,
)
trainer.fit()
Comet
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
# Create dummy data
X = torch.randn(128, 3) # 128 samples, 3 features
y = torch.randint(0, 2, (128,)) # 128 binary labels
# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)
# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Define a dummy model
class DummyModel(pl.LightningModule):
def __init__(self):
super().__init__()
self.layer = torch.nn.Linear(3, 1)
def forward(self, x):
return self.layer(x)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())
# The metrics below will be reported to Loggers
self.log("train_loss", loss)
self.log_dict({
"metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
})
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.comet import CometLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
logger = None
if ray.train.get_context().get_world_rank() == 0:
logger = CometLogger(api_key=os.environ["COMET_API_KEY"])
ptl_trainer = pl.Trainer(
max_epochs=5,
accelerator="cpu",
logger=logger,
log_every_n_steps=1,
)
model = DummyModel()
ptl_trainer.fit(model, train_dataloaders=dataloader)
scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
assert (
"COMET_API_KEY" in os.environ
), 'Please do COMET_API_KEY="abcde" when running this script.'
# This makes sure that all workers have this env var set.
ray.init(runtime_env={"env_vars": {"COMET_API_KEY": os.environ["COMET_API_KEY"]}})
trainer = TorchTrainer(
train_func,
scaling_config=scaling_config,
)
trainer.fit()
TensorBoard
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
# Create dummy data
X = torch.randn(128, 3) # 128 samples, 3 features
y = torch.randint(0, 2, (128,)) # 128 binary labels
# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)
# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Define a dummy model
class DummyModel(pl.LightningModule):
def __init__(self):
super().__init__()
self.layer = torch.nn.Linear(3, 1)
def forward(self, x):
return self.layer(x)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())
# The metrics below will be reported to Loggers
self.log("train_loss", loss)
self.log_dict({
"metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
})
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.tensorboard import TensorBoardLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
save_dir = config["save_dir"]
logger = None
if ray.train.get_context().get_world_rank() == 0:
logger = TensorBoardLogger(name="demo-run", save_dir=f"file:{save_dir}")
ptl_trainer = pl.Trainer(
max_epochs=5,
accelerator="cpu",
logger=logger,
log_every_n_steps=1,
)
model = DummyModel()
ptl_trainer.fit(model, train_dataloaders=dataloader)
scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
assert (
"SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."
trainer = TorchTrainer(
train_func,
train_loop_config={
"save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "tensorboard")
},
scaling_config=scaling_config,
)
trainer.fit()
常见错误#
缺少凭据#
我已经调用了 `wandb login` CLI,但仍然收到
wandb: ERROR api_key not configured (no-tty). call wandb.login(key=[your_api_key]).
这可能是由于工作进程上没有正确设置 wandb 凭据。请确保您运行 wandb.login 或将 WANDB_API_KEY 传递给每个训练函数。有关更多详细信息,请参阅设置凭据。
缺少配置#
我已经运行了 `databricks configure`,但仍然收到
databricks_cli.utils.InvalidConfigurationError: You haven't configured the CLI yet!
这通常是由运行 databricks configure 引起的,该命令仅在主节点上生成 ~/.databrickscfg。将此文件移动到共享位置或将其复制到每个节点。有关更多详细信息,请参阅设置凭据。