Ray Data 快速入门#

开始使用 Ray Data 的 Dataset 抽象进行分布式数据处理。

本指南向您介绍 Ray Data 的核心功能

数据集#

Ray Data 的主要抽象是 Dataset,它代表一个分布式数据集合。Dataset 专门为机器学习工作负载设计,可以高效地处理超出单机内存容量的数据集合。

加载数据#

可以从各种来源创建数据集,包括本地文件、Python 对象以及 S3 或 GCS 等云存储服务。Ray Data 无缝集成 Arrow 支持的任何文件系统

import ray

# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Preview the first record
ds.show(limit=1)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}

要了解更多关于从不同来源创建数据集的信息,请阅读 加载数据

转换数据#

应用用户自定义函数 (UDF) 来转换数据集。Ray 会自动在集群中并行化这些转换,以获得更好的性能。

from typing import Dict
import numpy as np

# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)

# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
MaterializedDataset(
   num_blocks=...,
   num_rows=150,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64,
      petal area (cm^2): double
   }
)

要探索更多转换功能,请阅读 转换数据

消费数据#

通过方便的方法如 take_batch()iter_batches() 访问数据集内容。您也可以将数据集直接传递给 Ray Tasks 或 Actors 进行分布式处理。

# Extract the first 3 rows as a batch for processing
print(transformed_ds.take_batch(batch_size=3))
{'sepal length (cm)': array([5.1, 4.9, 4.7]),
    'sepal width (cm)': array([3.5, 3. , 3.2]),
    'petal length (cm)': array([1.4, 1.4, 1.3]),
    'petal width (cm)': array([0.2, 0.2, 0.2]),
    'target': array([0, 0, 0]),
    'petal area (cm^2)': array([0.28, 0.28, 0.26])}

有关处理数据集内容的更多详细信息,请参阅 迭代数据保存数据

保存数据#

使用 write_parquet()write_csv() 等方法将处理后的数据集导出到各种格式和存储位置。

import os

# Save the transformed dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")

# Verify the files were created
print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']

有关保存数据集的更多信息,请参阅 保存数据