保存数据#
Ray Data 允许你将数据保存到文件或其他 Python 对象中。
本指南将向你展示如何
将数据写入文件#
Ray Data 可以写入本地磁盘和云存储。
将数据写入本地磁盘#
要将你的 Dataset
保存到本地磁盘,调用类似 Dataset.write_parquet
的方法,并使用 local://
方案指定本地目录。
警告
如果你的集群包含多个节点且未使用 local://
方案,Ray Data 会将数据的不同分区写入不同的节点。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_parquet("local:///tmp/iris/")
要将数据写入除 Parquet 之外的其他格式,请阅读 输入/输出参考。
将数据写入云存储#
要将你的 Dataset
保存到云存储,请使用你的云服务提供商认证所有节点。然后,调用类似 Dataset.write_parquet
的方法,并使用相应的方案指定 URI。URI 可以指向存储桶或文件夹。
要将数据写入除 Parquet 之外的其他格式,请阅读 输入/输出参考。
要将数据保存到 Amazon S3,请使用 s3://
方案指定 URI。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_parquet("s3://my-bucket/my-folder")
Ray Data 依赖 PyArrow 进行 Amazon S3 认证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 S3 文件系统文档。
要将数据保存到 Google Cloud Storage,请安装 Google Cloud Storage 的文件系统接口
pip install gcsfs
然后,创建一个 GCSFileSystem
并使用 gcs://
方案指定 URI。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds.write_parquet("gcs://my-bucket/my-folder", filesystem=filesystem)
Ray Data 依赖 PyArrow 进行 Google Cloud Storage 认证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 GCS 文件系统文档。
要将数据保存到 Azure Blob Storage,请安装 Azure-Datalake Gen1 和 Gen2 Storage 的文件系统接口
pip install adlfs
然后,创建一个 AzureBlobFileSystem
并使用 az://
方案指定 URI。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
filesystem = adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
ds.write_parquet("az://my-bucket/my-folder", filesystem=filesystem)
Ray Data 依赖 PyArrow 进行 Azure Blob Storage 认证。有关如何配置凭据以与 PyArrow 兼容的更多信息,请参阅其 fsspec 兼容文件系统文档。
将数据写入 NFS#
要将你的 Dataset
保存到 NFS 文件系统,调用类似 Dataset.write_parquet
的方法,并指定一个挂载目录。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_parquet("/mnt/cluster_storage/iris")
要将数据写入除 Parquet 之外的其他格式,请阅读 输入/输出参考。
改变输出文件数量#
当你调用写入方法时,Ray Data 会将你的数据写入多个文件。要控制输出文件的数量,请配置 min_rows_per_file
。
注意
min_rows_per_file
是一个提示,不是严格限制。Ray Data 可能会向每个文件写入更多或更少的行。在底层,如果每个块的行数大于指定值,Ray Data 会将每个块的行数写入每个文件。
import os
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_csv("/tmp/few_files/", min_rows_per_file=75)
print(os.listdir("/tmp/few_files/"))
['0_000001_000000.csv', '0_000000_000000.csv', '0_000002_000000.csv']
将 Dataset 转换为其他 Python 库#
将 Dataset 转换为 pandas#
要将 Dataset
转换为 pandas DataFrame,请调用 Dataset.to_pandas()
。你的数据必须能够容纳在头节点的内存中。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_pandas()
print(df)
sepal length (cm) sepal width (cm) ... petal width (cm) target
0 5.1 3.5 ... 0.2 0
1 4.9 3.0 ... 0.2 0
2 4.7 3.2 ... 0.2 0
3 4.6 3.1 ... 0.2 0
4 5.0 3.6 ... 0.2 0
.. ... ... ... ... ...
145 6.7 3.0 ... 2.3 2
146 6.3 2.5 ... 1.9 2
147 6.5 3.0 ... 2.0 2
148 6.2 3.4 ... 2.3 2
149 5.9 3.0 ... 1.8 2
<BLANKLINE>
[150 rows x 5 columns]
将 Dataset 转换为分布式 DataFrame#
Ray Data 可以与分布式数据处理框架(如 Daft、Dask、Spark、Modin 和 Mars)互操作。
要将 Dataset
转换为 Daft Dataframe,请调用 Dataset.to_daft()
。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_daft()
要将 Dataset
转换为 Dask DataFrame,请调用 Dataset.to_dask()
。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_dask()
df
╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────╮
│ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ Float64 ┆ Float64 ┆ Float64 ┆ Float64 ┆ Int64 │
╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╡
│ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.9 ┆ 3 ┆ 1.4 ┆ 0.2 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4.6 ┆ 3.4 ┆ 1.4 ┆ 0.3 ┆ 0 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 5 ┆ 3.4 ┆ 1.5 ┆ 0.2 ┆ 0 │
╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────╯
(Showing first 8 of 150 rows)
要将 Dataset
转换为 Spark DataFrame,请调用 Dataset.to_spark()
。
import ray
import raydp
spark = raydp.init_spark(
app_name = "example",
num_executors = 1,
executor_cores = 4,
executor_memory = "512M"
)
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)
要将 Dataset
转换为 Modin DataFrame,请调用 Dataset.to_modin()
。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
mdf = ds.to_modin()
要将 Dataset
转换为 Mars DataFrame,请调用 Dataset.to_mars()
。
import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
mdf = ds.to_mars()