Ray Data 快速入门#
开始使用 Ray Data 的 Dataset 抽象进行分布式数据处理。
本指南将向您介绍 Ray Data 的核心功能
数据集#
Ray Data 的核心抽象是 Dataset,它表示分布式数据集合。Datasets 专为机器学习工作负载而设计,可以有效地处理超出单机内存容量的数据集合。
加载数据#
可以从各种来源创建 Datasets,包括本地文件、Python 对象以及 S3 或 GCS 等云存储服务。Ray Data 与任何 Arrow 支持的文件系统 无缝集成。
import ray
# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Preview the first record
ds.show(limit=1)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
要了解有关从不同来源创建 Datasets 的更多信息,请阅读 加载数据。
转换数据#
应用用户定义的函数 (UDFs) 来转换 Datasets。Ray 会自动在您的集群中并行化这些转换,以提高性能。
from typing import Dict
import numpy as np
# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["petal area (cm^2)"] = vec_a * vec_b
return batch
# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)
# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
MaterializedDataset(
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64,
petal area (cm^2): double
}
)
要探索更多转换功能,请阅读 转换数据。
消费数据#
通过方便的方法(如 take_batch() 和 iter_batches())访问 Dataset 内容。您还可以直接将 Datasets 传递给 Ray Tasks 或 Actors 进行分布式处理。
# Extract the first 3 rows as a batch for processing
print(transformed_ds.take_batch(batch_size=3))
{'sepal length (cm)': array([5.1, 4.9, 4.7]),
'sepal width (cm)': array([3.5, 3. , 3.2]),
'petal length (cm)': array([1.4, 1.4, 1.3]),
'petal width (cm)': array([0.2, 0.2, 0.2]),
'target': array([0, 0, 0]),
'petal area (cm^2)': array([0.28, 0.28, 0.26])}
保存数据#
使用 write_parquet()、write_csv() 等方法将处理后的 Datasets 导出到各种格式和存储位置。
import os
# Save the transformed dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")
# Verify the files were created
print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']
有关保存 Datasets 的更多信息,请参阅 保存数据。