转换数据#
转换允许您处理和修改数据集。您可以组合转换来表达计算链。
本指南将向您展示如何
转换行#
提示
如果您的转换是向量化的,为了获得更好的性能,请调用map_batches()
。要了解更多信息,请参阅转换批次。
使用 map 转换行#
如果您的转换对每个输入行返回恰好一行,请调用map()
。
import os
from typing import Any, Dict
import ray
def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
row["filename"] = os.path.basename(row["path"])
return row
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
.map(parse_filename)
)
传递给map()
的用户定义函数类型应为 Callable[[Dict[str, Any]], Dict[str, Any]]
。换句话说,您的函数应该输入和输出一个字典,其键是字符串,值可以是任何类型。例如
from typing import Any, Dict
def fn(row: Dict[str, Any]) -> Dict[str, Any]:
# access row data
value = row["col1"]
# add data to row
row["col2"] = ...
# return row
return row
使用 flat_map 转换行#
如果您的转换对每个输入行返回多行,请调用flat_map()
。
from typing import Any, Dict, List
import ray
def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
return [row] * 2
print(
ray.data.range(3)
.flat_map(duplicate_row)
.take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
传递给flat_map()
的用户定义函数类型应为Callable[[Dict[str, Any]], List[Dict[str, Any]]]
。换句话说,您的函数应该输入一个键为字符串、值为任何类型的字典,并输出一个字典列表,其类型与输入相同。例如
from typing import Any, Dict, List
def fn(row: Dict[str, Any]) -> List[Dict[str, Any]]:
# access row data
value = row["col1"]
# add data to row
row["col2"] = ...
# construct output list
output = [row, row]
# return list of output rows
return output
转换批次#
如果您的转换是向量化的,例如大多数 NumPy 或 pandas 操作,转换批次比转换行性能更高。
from typing import Dict
import numpy as np
import ray
def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness)
)
配置批次格式#
Ray Data 将批次表示为 NumPy ndarray 或 pandas DataFrame 的字典。默认情况下,Ray Data 将批次表示为 NumPy ndarray 的字典。要配置批次类型,请在map_batches()
中指定 batch_format
。您的函数可以返回任何一种格式,但 batch_format
应与您函数的输入匹配。
from typing import Dict
import numpy as np
import ray
def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray
def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)
您传递给map_batches()
的用户定义函数更加灵活。因为您可以通过多种方式表示批次(参阅配置批次格式),所以函数类型应为 Callable[DataBatch, DataBatch]
,其中 DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]
。换句话说,您的函数应该以一批数据作为输入和输出,这批数据可以表示为 pandas DataFrame 或键为字符串、值为 NumPy ndarray 的字典。例如,您的函数可能看起来像
import pandas as pd
def fn(batch: pd.DataFrame) -> pd.DataFrame:
# modify batch
batch = ...
# return batch
return batch
用户定义函数也可以是一个 Python 生成器,它生成批次,因此函数类型也可以是 Callable[DataBatch, Iterator[[DataBatch]]
,其中 DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]
。在这种情况下,您的函数会看起来像
from typing import Dict, Iterator
import numpy as np
def fn(batch: Dict[str, np.ndarray]) -> Iterator[Dict[str, np.ndarray]]:
# yield the same batch multiple times
for _ in range(10):
yield batch
配置批次大小#
增加 batch_size
可以提高向量化转换(如 NumPy 函数和模型推理)的性能。但是,如果批次大小过大,您的程序可能会内存不足。如果遇到内存不足错误,请减小 batch_size
。
行的排序#
转换数据时,块 的顺序默认不保留。
如果需要保留/确定块的顺序,可以使用 sort()
方法,或将 ray.data.ExecutionOptions.preserve_order
设置为 True
。请注意,在较大的集群设置中,设置此标志可能会对性能产生负面影响,因为在这些设置中出现慢节点的可能性更大。
import ray
ctx = ray.data.DataContext().get_current()
# By default, this is set to False.
ctx.execution_options.preserve_order = True
有状态转换#
如果您的转换需要耗时的设置,例如下载模型权重,请使用可调用 Python 类而不是函数来使转换有状态。当使用 Python 类时,会在每个 worker 上调用 __init__
方法执行设置,且仅执行一次。相比之下,函数是无状态的,因此必须为每个数据项执行任何设置。
在内部,Ray Data 使用任务执行函数,使用 actor 执行类。要了解更多关于任务和 actor 的信息,请阅读Ray Core 关键概念。
要使用 Python 类转换数据,请完成以下步骤
实现一个类。在
__init__
中执行设置,在__call__
中转换数据。调用
map_batches()
、map()
或flat_map()
。使用concurrency
参数传递要使用的并发 worker 数量。每个 worker 并行转换一部分数据。固定并发 worker 数量可获得最可预测的性能,但您也可以传递一个(min, max)
元组,让 Ray Data 自动伸缩并发 worker 的数量。
from typing import Dict
import numpy as np
import torch
import ray
class TorchPredictor:
def __init__(self):
self.model = torch.nn.Identity()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
batch["output"] = self.model(inputs).detach().numpy()
return batch
ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(TorchPredictor, concurrency=2)
)
from typing import Dict
import numpy as np
import torch
import ray
class TorchPredictor:
def __init__(self):
self.model = torch.nn.Identity().cuda()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
with torch.inference_mode():
batch["output"] = self.model(inputs).detach().cpu().numpy()
return batch
ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(
TorchPredictor,
# Two workers with one GPU each
concurrency=2,
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
)
)
避免内存不足错误#
如果您的用户定义函数使用大量内存,您可能会遇到内存不足错误。为避免这些错误,请配置 memory
参数。它会告诉 Ray 您的函数使用了多少内存,并阻止 Ray 在一个节点上调度太多任务。
def uses_lots_of_memory(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
...
# Tell Ray that the function uses 1 GiB of memory
ds.map_batches(uses_lots_of_memory, memory=1 * 1024 * 1024)
按组分组和转换组#
要转换组,请调用 groupby()
对行进行分组。然后,调用 map_groups()
来转换这些组。
from typing import Dict
import numpy as np
import ray
items = [
{"image": np.zeros((32, 32, 3)), "label": label}
for _ in range(10) for label in range(100)
]
def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
return group
ds = (
ray.data.from_items(items)
.groupby("label")
.map_groups(normalize_images)
)
import pandas as pd
import ray
def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
target = group.drop("target")
group = (group - group.min()) / group.std()
group["target"] = target
return group
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.groupby("target")
.map_groups(normalize_features)
)