数据转换#

转换可让您处理和修改数据集。您可以组合转换来表达计算链。

注意

转换默认是惰性的。它们不会被执行,直到您通过迭代 Dataset保存 Dataset检查 Dataset 属性 来触发数据消耗。

本指南将向您展示如何

转换行#

提示

如果您的转换是向量化的,请调用map_batches() 以获得更好的性能。要了解更多信息,请参阅转换批次

使用 map 转换行#

如果您的转换对每个输入行返回正好一个行,请调用map()

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

传递给map()的用户定义函数应为Callable[[Dict[str, Any]], Dict[str, Any]] 类型。换句话说,您的函数应输入和输出一个字典,其中键是字符串,值是任何类型。例如

from typing import Any, Dict

def fn(row: Dict[str, Any]) -> Dict[str, Any]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # return row
    return row

使用 flat map 转换行#

如果您的转换对每个输入行返回多个行,请调用flat_map()

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

传递给flat_map()的用户定义函数应为Callable[[Dict[str, Any]], List[Dict[str, Any]]] 类型。换句话说,您的函数应该输入一个字典,其中键是字符串,值是任何类型,并输出一个与输入具有相同类型的字典列表,例如

from typing import Any, Dict, List

def fn(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # construct output list
    output = [row, row]

    # return list of output rows
    return output

转换批次#

如果您的转换可以使用 NumPy、PyArrow 或 Pandas 操作进行向量化,则转换批次比转换单个行具有更高的性能。

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness)
)

配置批次格式#

Ray Data 将批次表示为 NumPy ndarray、Pandas DataFrame 或 Arrow Table 的字典。默认情况下,Ray Data 将批次表示为 NumPy ndarray 的字典。要配置批次类型,请在map_batches() 中指定 batch_format。您可以从您的函数返回任何一种格式,但 batch_format 应与您函数的输入匹配。

在将转换应用于行批次时,Ray Data 可以将这些批次表示为 NumPy 的 ndarrays、Pandas DataFrame 或 PyArrow Table

使用
  • batch_format=numpy 时,函数的输入将是一个字典,其中键对应于列名,值对应于表示为 ndarrays 的列值。

  • batch_format=pyarrow 时,函数的输入将是一个 Pyarrow Table

  • batch_format=pandas 时,函数的输入将是一个 Pandas DataFrame

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
    return batch.dropna()

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .map_batches(drop_nas, batch_format="pandas")
)
import pyarrow as pa
import pyarrow.compute as pc
import ray

def drop_nas(batch: pa.Table) -> pa.Table:
    return pc.drop_null(batch)

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .map_batches(drop_nas, batch_format="pyarrow")
)

用户定义的函数也可以是一个产生批次的 Python 生成器,因此函数也可以是 Callable[DataBatch, Iterator[[DataBatch]] 类型,其中 DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]。在这种情况下,您的函数看起来会像

from typing import Dict, Iterator
import numpy as np

def fn(batch: Dict[str, np.ndarray]) -> Iterator[Dict[str, np.ndarray]]:
    # yield the same batch multiple times
    for _ in range(10):
        yield batch

选择正确的批次格式#

为您的 map_batches 选择合适的批次格式时,主要考虑的是便利性与性能之间的权衡。

  1. 批次是底层块的滑动窗口:UDF 使用构成当前指定 batch_size 的批次的底层块的行的子集进行调用。指定 batch_size=None 会使批次包含块中的所有行,形成一个批次。

  2. 根据批次格式,这种视图可以是零拷贝(当批次格式与pandaspyarrow 的块类型匹配时)或拷贝一次(当批次格式与块类型不同时)。

例如,如果底层块类型是 Arrow,指定 batch_format="numpy"batch_format="pandas" 可能会在从底层块类型转换时引起底层数据的拷贝。

Ray Data 还致力于最小化数据转换量:例如,如果您的 map_batches 操作返回 Pandas 批次,那么这些批次将在进行转换的情况下合并为块,并作为 Pandas 块进一步传播。大多数 Ray Data 数据源会产生 Arrow 块,因此使用批次格式 pyarrow 可以避免不必要的数据转换。

如果您希望使用更符合人体工程学的转换 API 但又想避免性能开销,可以考虑在您的 map_batches 操作中使用 polars,并设置 batch_format="pyarrow",如下所示

import pyarrow as pa

def udf(table: pa.Table):
    import polars as pl
    df = polars.from_pyarrow(table)
    df.summary()
    return df.to_arrow()

ds.map_batches(udf, batch_format="pyarrow")

配置批次大小#

增加 batch_size 可以提高向量化转换的性能以及模型推理的性能。但是,如果您的批次大小太大,您的程序可能会遇到内存不足(OOM)的错误。

如果您遇到 OOM 错误,请尝试减小您的 batch_size

行的顺序#

在转换数据时,的顺序默认不被保留。

如果需要保留/确定块的顺序,您可以使用 sort() 方法,或将 ray.data.ExecutionOptions.preserve_order 设置为 True。请注意,设置此标志可能会对较大的集群设置产生负面影响,因为在这些设置中,拖后腿的节点更可能出现。

import ray

ctx = ray.data.DataContext().get_current()

# By default, this is set to False.
ctx.execution_options.preserve_order = True

有状态转换#

如果您的转换需要昂贵的设置,例如下载模型权重,请使用可调用的 Python 类而不是函数来使转换具有状态。当使用 Python 类时,__init__ 方法会在每个工作节点上恰好调用一次以执行设置。相比之下,函数是无状态的,因此任何设置都必须为每个数据项执行。

内部来说,Ray Data 使用任务来执行函数,并使用 actor 来执行类。要了解更多关于任务和 actor 的信息,请阅读Ray Core 核心概念

要使用 Python 类转换数据,请完成以下步骤

  1. 实现一个类。在 __init__ 中执行设置,并在 __call__ 中转换数据。

  2. 调用 map_batches()map()flat_map()。使用 compute 参数传递一个计算策略来控制 Ray 使用多少个工作节点。每个工作节点并行处理数据分区。使用 ray.data.TaskPoolStrategy(size=n) 来限制并发任务的数量,或使用 ray.data.ActorPoolStrategy(...) 在固定或自动缩放的 actor 池上运行可调用类。

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        compute=ray.data.ActorPoolStrategy(size=2),
    )
)
from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        compute=ray.data.ActorPoolStrategy(size=2),
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

避免内存不足错误#

如果您的用户定义函数使用了大量内存,您可能会遇到内存不足的错误。为避免这些错误,请配置 memory 参数。它告诉 Ray 您的函数使用了多少内存,并防止 Ray 在节点上调度过多的任务。

def uses_lots_of_memory(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    ...

# Tell Ray that the function uses 1 GiB of memory
ds.map_batches(uses_lots_of_memory, memory=1 * 1024 * 1024)

分组和转换组#

要转换组,请调用 groupby() 以根据提供的 key 列值对行进行分组。然后,调用 map_groups() 来对每个组执行转换。

from typing import Dict
import numpy as np
import ray

items = [
    {"image": np.zeros((32, 32, 3)), "label": label}
    for _ in range(10) for label in range(100)
]

def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
    return group

ds = (
    ray.data.from_items(items)
    .groupby("label")
    .map_groups(normalize_images)
)
import pandas as pd
import ray

def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
    target = group.drop("target")
    group = (group - group.min()) / group.std()
    group["target"] = target
    return group

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .groupby("target")
    .map_groups(normalize_features)
)