Ray Data 快速入门#
开始使用 Ray Data 的 Dataset
抽象进行分布式数据处理。
本指南向您介绍 Ray Data 的核心功能
数据集#
Ray Data 的主要抽象是 Dataset
,它代表一个分布式数据集合。Dataset 专门为机器学习工作负载设计,可以高效地处理超出单机内存容量的数据集合。
加载数据#
可以从各种来源创建数据集,包括本地文件、Python 对象以及 S3 或 GCS 等云存储服务。Ray Data 无缝集成 Arrow 支持的任何文件系统。
import ray
# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Preview the first record
ds.show(limit=1)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
要了解更多关于从不同来源创建数据集的信息,请阅读 加载数据。
转换数据#
应用用户自定义函数 (UDF) 来转换数据集。Ray 会自动在集群中并行化这些转换,以获得更好的性能。
from typing import Dict
import numpy as np
# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["petal area (cm^2)"] = vec_a * vec_b
return batch
# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)
# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
MaterializedDataset(
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64,
petal area (cm^2): double
}
)
要探索更多转换功能,请阅读 转换数据。
消费数据#
通过方便的方法如 take_batch()
和 iter_batches()
访问数据集内容。您也可以将数据集直接传递给 Ray Tasks 或 Actors 进行分布式处理。
# Extract the first 3 rows as a batch for processing
print(transformed_ds.take_batch(batch_size=3))
{'sepal length (cm)': array([5.1, 4.9, 4.7]),
'sepal width (cm)': array([3.5, 3. , 3.2]),
'petal length (cm)': array([1.4, 1.4, 1.3]),
'petal width (cm)': array([0.2, 0.2, 0.2]),
'target': array([0, 0, 0]),
'petal area (cm^2)': array([0.28, 0.28, 0.26])}
保存数据#
使用 write_parquet()
、write_csv()
等方法将处理后的数据集导出到各种格式和存储位置。
import os
# Save the transformed dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")
# Verify the files were created
print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']
有关保存数据集的更多信息,请参阅 保存数据。