高级:读取和写入自定义文件类型#

本指南将向你展示如何扩展 Ray Data 以读取和写入 Ray 原生不支持的文件类型。这是一份高级指南,你将使用不稳定的内部 API。

Ray Data 已通过 read_images()write_images() API 支持图像读写,但本示例将演示如何实现这些功能,仅为说明目的。

从文件读取数据#

提示

如果你不打算贡献 Ray Data,则无需创建 Datasource。你可以直接调用 read_binary_files() 并使用 map() 解码文件。

用于读取文件的核心抽象是 FileBasedDatasource。它在 Datasource 接口之上提供了特定于文件的功能。

要继承 FileBasedDatasource,需要实现构造函数和 _read_stream 方法。

实现构造函数#

调用超类构造函数并指定你要读取的文件。可选地,指定有效的文件扩展名。Ray Data 会忽略具有其他扩展名的文件。

from ray.data.datasource import FileBasedDatasource

class ImageDatasource(FileBasedDatasource):
    def __init__(self, paths: Union[str, List[str]], *, mode: str):
        super().__init__(
            paths,
            file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
        )

        self.mode = mode  # Specify read options in the constructor

实现 _read_stream 方法#

_read_stream 是一个生成器,它从文件中生成一个或多个数据块。

块 (Blocks) 是 Ray Data 内部用于表示一组行数据的抽象。它们可以是 PyArrow 表、pandas DataFrames 或 NumPy 数组的字典。

不要直接创建块。而是将数据行添加到 DelegatingBlockBuilder 中。

    def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
        import io
        import numpy as np
        from PIL import Image
        from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder

        data = f.readall()
        image = Image.open(io.BytesIO(data))
        image = image.convert(self.mode)

        # Each block contains one row
        builder = DelegatingBlockBuilder()
        array = np.array(image)
        item = {"image": array}
        builder.add(item)
        yield builder.build()

读取你的数据#

实现 ImageDatasource 后,调用 read_datasource() 将图像读取到 Dataset 中。Ray Data 会并行读取你的文件。

import ray

ds = ray.data.read_datasource(
    ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)

将数据写入文件#

注意

写入接口正在积极开发中,未来可能会发生变化。如果你有功能需求,请在 GitHub 上提交 Issue

用于将数据写入文件的核心抽象是 RowBasedFileDatasinkBlockBasedFileDatasink。它们在 Datasink 接口之上提供了特定于文件的功能。

如果你想每个文件写入一行,请继承 RowBasedFileDatasink。否则,继承 BlockBasedFileDatasink

在本例中,你将每个文件写入一个图像,因此你将继承 RowBasedFileDatasink。要继承 RowBasedFileDatasink,需要实现构造函数和 write_row_to_file() 方法。

实现构造函数#

调用超类构造函数并指定要写入的文件夹。可选地,指定一个表示文件格式的字符串(例如,"png")。Ray Data 会将文件格式用作文件扩展名。

from ray.data.datasource import RowBasedFileDatasink

class ImageDatasink(RowBasedFileDatasink):
    def __init__(self, path: str, column: str, file_format: str):
        super().__init__(path, file_format=file_format)

        self.column = column
        self.file_format = file_format  # Specify write options in the constructor

实现 write_row_to_file 方法#

write_row_to_file 将一行数据写入文件。每一行都是一个字典,将列名映射到值。

    def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
        import io
        from PIL import Image

        # PIL can't write to a NativeFile, so we have to write to a buffer first.
        image = Image.fromarray(row[self.column])
        buffer = io.BytesIO()
        image.save(buffer, format=self.file_format)
        file.write(buffer.getvalue())

写入你的数据#

实现 ImageDatasink 后,调用 write_datasink() 将图像写入文件。Ray Data 会并行写入多个文件。

ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))