高级:读取和写入自定义文件类型#
本指南将向你展示如何扩展 Ray Data 以读取和写入 Ray 原生不支持的文件类型。这是一份高级指南,你将使用不稳定的内部 API。
Ray Data 已通过 read_images()
和 write_images()
API 支持图像读写,但本示例将演示如何实现这些功能,仅为说明目的。
从文件读取数据#
提示
如果你不打算贡献 Ray Data,则无需创建 Datasource
。你可以直接调用 read_binary_files()
并使用 map()
解码文件。
用于读取文件的核心抽象是 FileBasedDatasource
。它在 Datasource
接口之上提供了特定于文件的功能。
要继承 FileBasedDatasource
,需要实现构造函数和 _read_stream
方法。
实现构造函数#
调用超类构造函数并指定你要读取的文件。可选地,指定有效的文件扩展名。Ray Data 会忽略具有其他扩展名的文件。
from ray.data.datasource import FileBasedDatasource
class ImageDatasource(FileBasedDatasource):
def __init__(self, paths: Union[str, List[str]], *, mode: str):
super().__init__(
paths,
file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
)
self.mode = mode # Specify read options in the constructor
实现 _read_stream
方法#
_read_stream
是一个生成器,它从文件中生成一个或多个数据块。
块 (Blocks) 是 Ray Data 内部用于表示一组行数据的抽象。它们可以是 PyArrow 表、pandas DataFrames 或 NumPy 数组的字典。
不要直接创建块。而是将数据行添加到 DelegatingBlockBuilder 中。
def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
import io
import numpy as np
from PIL import Image
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
data = f.readall()
image = Image.open(io.BytesIO(data))
image = image.convert(self.mode)
# Each block contains one row
builder = DelegatingBlockBuilder()
array = np.array(image)
item = {"image": array}
builder.add(item)
yield builder.build()
读取你的数据#
实现 ImageDatasource
后,调用 read_datasource()
将图像读取到 Dataset
中。Ray Data 会并行读取你的文件。
import ray
ds = ray.data.read_datasource(
ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)
将数据写入文件#
注意
写入接口正在积极开发中,未来可能会发生变化。如果你有功能需求,请在 GitHub 上提交 Issue。
用于将数据写入文件的核心抽象是 RowBasedFileDatasink
和 BlockBasedFileDatasink
。它们在 Datasink
接口之上提供了特定于文件的功能。
如果你想每个文件写入一行,请继承 RowBasedFileDatasink
。否则,继承 BlockBasedFileDatasink
。
在本例中,你将每个文件写入一个图像,因此你将继承 RowBasedFileDatasink
。要继承 RowBasedFileDatasink
,需要实现构造函数和 write_row_to_file()
方法。
实现构造函数#
调用超类构造函数并指定要写入的文件夹。可选地,指定一个表示文件格式的字符串(例如,"png"
)。Ray Data 会将文件格式用作文件扩展名。
from ray.data.datasource import RowBasedFileDatasink
class ImageDatasink(RowBasedFileDatasink):
def __init__(self, path: str, column: str, file_format: str):
super().__init__(path, file_format=file_format)
self.column = column
self.file_format = file_format # Specify write options in the constructor
实现 write_row_to_file
方法#
write_row_to_file
将一行数据写入文件。每一行都是一个字典,将列名映射到值。
def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
import io
from PIL import Image
# PIL can't write to a NativeFile, so we have to write to a buffer first.
image = Image.fromarray(row[self.column])
buffer = io.BytesIO()
image.save(buffer, format=self.file_format)
file.write(buffer.getvalue())
写入你的数据#
实现 ImageDatasink
后,调用 write_datasink()
将图像写入文件。Ray Data 会并行写入多个文件。
ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))