Ray Data 快速入门#

开始使用 Ray Data 的 Dataset 抽象进行分布式数据处理。

本指南将向您介绍 Ray Data 的核心功能

数据集#

Ray Data 的核心抽象是 Dataset,它表示分布式数据集合。Datasets 专为机器学习工作负载而设计,可以有效地处理超出单机内存容量的数据集合。

加载数据#

可以从各种来源创建 Datasets,包括本地文件、Python 对象以及 S3 或 GCS 等云存储服务。Ray Data 与任何 Arrow 支持的文件系统 无缝集成。

import ray

# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Preview the first record
ds.show(limit=1)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}

要了解有关从不同来源创建 Datasets 的更多信息,请阅读 加载数据

转换数据#

应用用户定义的函数 (UDFs) 来转换 Datasets。Ray 会自动在您的集群中并行化这些转换,以提高性能。

from typing import Dict
import numpy as np

# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)

# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
MaterializedDataset(
   num_blocks=...,
   num_rows=150,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64,
      petal area (cm^2): double
   }
)

要探索更多转换功能,请阅读 转换数据

消费数据#

通过方便的方法(如 take_batch()iter_batches())访问 Dataset 内容。您还可以直接将 Datasets 传递给 Ray Tasks 或 Actors 进行分布式处理。

# Extract the first 3 rows as a batch for processing
print(transformed_ds.take_batch(batch_size=3))
{'sepal length (cm)': array([5.1, 4.9, 4.7]),
    'sepal width (cm)': array([3.5, 3. , 3.2]),
    'petal length (cm)': array([1.4, 1.4, 1.3]),
    'petal width (cm)': array([0.2, 0.2, 0.2]),
    'target': array([0, 0, 0]),
    'petal area (cm^2)': array([0.28, 0.28, 0.26])}

有关处理 Dataset 内容的更多详细信息,请参阅 迭代数据保存数据

保存数据#

使用 write_parquet()write_csv() 等方法将处理后的 Datasets 导出到各种格式和存储位置。

import os

# Save the transformed dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")

# Verify the files were created
print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']

有关保存 Datasets 的更多信息,请参阅 保存数据