保存数据#

Ray Data 允许您将数据保存到文件或其他 Python 对象中。

本指南将向您展示如何

将数据写入文件#

Ray Data 可以写入本地磁盘和云存储。

将数据写入本地磁盘#

要将您的 Dataset 保存到本地磁盘,请调用诸如 Dataset.write_parquet 之类的方法,并使用 local:// 方案指定本地目录。

警告

如果您的集群包含多个节点,并且您没有使用 local://,Ray Data 会将不同数据分区写入不同节点。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("local:///tmp/iris/")

要将数据写入 Parquet 以外的其他格式,请参阅 输入/输出参考

将数据写入云存储#

要将您的 Dataset 保存到云存储,请先使用您的云服务提供商对所有节点进行身份验证。然后,调用诸如 Dataset.write_parquet 之类的方法,并使用适当的方案指定 URI。URI 可以指向存储桶或文件夹。

要将数据写入 Parquet 以外的其他格式,请参阅 输入/输出参考

要将数据保存到 Amazon S3,请使用 s3:// 方案指定 URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("s3://my-bucket/my-folder")

Ray Data 依赖 PyArrow 来进行 Amazon S3 身份验证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 S3 Filesystem 文档

要将数据保存到 Google Cloud Storage,请安装 Google Cloud Storage 的文件系统接口

pip install gcsfs

然后,创建一个 GCSFileSystem 并使用 gcs:// 方案指定 URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds.write_parquet("gcs://my-bucket/my-folder", filesystem=filesystem)

Ray Data 依赖 PyArrow 来进行 Google Cloud Storage 身份验证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 GCS Filesystem 文档

要将数据保存到 Azure Blob Storage,请安装 Azure-Datalake Gen1 and Gen2 Storage 的文件系统接口

pip install adlfs

然后,创建一个 AzureBlobFileSystem 并使用 az:// 方案指定 URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

filesystem = adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
ds.write_parquet("az://my-bucket/my-folder", filesystem=filesystem)

Ray Data 依赖 PyArrow 来进行 Azure Blob Storage 身份验证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 fsspec 兼容文件系统文档

将数据写入 NFS#

要将您的 Dataset 保存到 NFS 文件系统,请调用诸如 Dataset.write_parquet 之类的方法,并指定一个已挂载的目录。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("/mnt/cluster_storage/iris")

要将数据写入 Parquet 以外的其他格式,请参阅 输入/输出参考

更改输出文件的数量#

调用写入方法时,Ray Data 会将您的数据写入多个文件。要控制输出文件的数量,请配置 min_rows_per_file

注意

min_rows_per_file 是一个提示,不是严格的限制。Ray Data 可能会为每个文件写入更多或更少的行。在底层,如果每个块的行数大于指定值,Ray Data 会将每个块的行数写入每个文件。

import os
import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_csv("/tmp/few_files/", min_rows_per_file=75)

print(os.listdir("/tmp/few_files/"))
['0_000001_000000.csv', '0_000000_000000.csv', '0_000002_000000.csv']

写入分区数据集#

写入分区数据集(使用 Hive 风格、基于文件夹的分区)时,建议在写入之前按分区列对数据集进行重新分区。这可以让您*控制文件大小和数量*。当数据集按分区列重新分区后,每个块应包含与特定分区对应的所有行,这意味着创建的文件数量应基于为 write_parquet 方法等配置的参数(如 min_rows_per_filemax_rows_per_file)来控制。由于每个块都是独立写入的,因此在不预先重新分区的情况下写入数据集,可能会为每个分区创建 N 个文件(其中 N 是数据集中的块数),而控制文件数量和大小的能力非常有限(因为每个块可能包含对应于任何分区的行)。

def print_directory_tree(start_path: str) -> None:
    """
    Prints the directory tree structure starting from the given path.
    """
    for root, dirs, files in os.walk(start_path):
        level = root.replace(start_path, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f'{indent}{os.path.basename(root)}/')
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f'{subindent}{f}')

# Sample dataset that we’ll partition by ``city`` and ``year``.
df = pd.DataFrame(
    {
        "city": ["SF", "SF", "NYC", "NYC", "SF", "NYC", "SF", "NYC"],
        "year": [2023, 2024, 2023, 2024, 2023, 2023, 2024, 2024],
        "sales": [100, 120, 90, 115, 105, 95, 130, 110],
    }
)

ds = ray.data.from_pandas(df)
DataContext.shuffle_strategy=ShuffleStrategy.HASH_SHUFFLE

# ── Partitioned write ──────────────────────────────────────────────────────
# 1. Repartition so all rows with the same (city, year) land in the same
#    block – this minimises shuffling during the write.
# 2. Pass the same columns to ``partition_cols`` so Ray creates a
#    Hive-style directory layout:  city=<value>/year=<value>/....
# 3. Use ``min_rows_per_file`` / ``max_rows_per_file`` to control how many
#    rows Ray puts in each Parquet file.
ds.repartition(keys=["city", "year"], num_blocks=4).write_parquet(
    "/tmp/sales_partitioned",
    partition_cols=["city", "year"],
    min_rows_per_file=2,     # At least 2 rows in each file …
    max_rows_per_file=3,     # … but never more than 3.
)

print_directory_tree("/tmp/sales_partitioned")
sales_partitioned/
    city=NYC/
        year=2024/
            1_a2b8b82cd2904a368ec39f42ae3cf830_000000_000000-0.parquet
        year=2023/
            1_a2b8b82cd2904a368ec39f42ae3cf830_000001_000000-0.parquet
    city=SF/
        year=2024/
            1_a2b8b82cd2904a368ec39f42ae3cf830_000000_000000-0.parquet
        year=2023/
            1_a2b8b82cd2904a368ec39f42ae3cf830_000001_000000-0.parquet

将数据集转换为其他 Python 库

将数据集转换为 pandas

要将 Dataset 转换为 pandas DataFrame,请调用 Dataset.to_pandas()。您的数据必须适合内存中主节点的大小。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_pandas()
print(df)
     sepal length (cm)  sepal width (cm)  ...  petal width (cm)  target
0                  5.1               3.5  ...               0.2       0
1                  4.9               3.0  ...               0.2       0
2                  4.7               3.2  ...               0.2       0
3                  4.6               3.1  ...               0.2       0
4                  5.0               3.6  ...               0.2       0
..                 ...               ...  ...               ...     ...
145                6.7               3.0  ...               2.3       2
146                6.3               2.5  ...               1.9       2
147                6.5               3.0  ...               2.0       2
148                6.2               3.4  ...               2.3       2
149                5.9               3.0  ...               1.8       2
<BLANKLINE>
[150 rows x 5 columns]

将数据集转换为分布式 DataFrames

Ray Data 与 Dask、Spark、Modin 和 Mars 等分布式数据处理框架以及 Daft 互操作。

要将 Dataset 转换为 Daft DataFrame,请调用 Dataset.to_daft()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_daft()
print(df)
╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────╮
│ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target │
│ ---               ┆ ---              ┆ ---               ┆ ---              ┆ ---    │
│ Float64           ┆ Float64          ┆ Float64           ┆ Float64          ┆ Int64  │
╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╡
│ 5.1               ┆ 3.5              ┆ 1.4               ┆ 0.2              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.9               ┆ 3                ┆ 1.4               ┆ 0.2              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.7               ┆ 3.2              ┆ 1.3               ┆ 0.2              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.6               ┆ 3.1              ┆ 1.5               ┆ 0.2              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5                 ┆ 3.6              ┆ 1.4               ┆ 0.2              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5.4               ┆ 3.9              ┆ 1.7               ┆ 0.4              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.6               ┆ 3.4              ┆ 1.4               ┆ 0.3              ┆ 0      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5                 ┆ 3.4              ┆ 1.5               ┆ 0.2              ┆ 0      │
╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────╯

(Showing first 8 of 150 rows)

要将 Dataset 转换为 Dask DataFrame,请调用 Dataset.to_dask()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_dask()

要将 Dataset 转换为 Spark DataFrame,请调用 Dataset.to_spark()

import ray
import raydp

spark = raydp.init_spark(
    app_name = "example",
    num_executors = 1,
    executor_cores = 4,
    executor_memory = "512M"
)

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)

要将 Dataset 转换为 Modin DataFrame,请调用 Dataset.to_modin()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_modin()

要将 Dataset 从 Mars DataFrame 转换,请调用 Dataset.to_mars()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_mars()