高级:读取和写入自定义文件类型#
本指南将向您展示如何扩展 Ray Data 来读取和写入 Ray Data 不支持的自定义文件类型。这是一个高级指南,您将使用不稳定的内部 API。
图像已通过 read_images() 和 write_images() API 支持,但本示例出于说明目的,将向您展示如何实现它们。
从文件读取数据#
提示
如果您不为 Ray Data 贡献代码,则无需创建 Datasource。而是可以调用 read_binary_files() 并使用 map() 解码文件。
读取文件的核心抽象是 FileBasedDatasource。它在 Datasource 接口之上提供了文件特定的功能。
要继承 FileBasedDatasource,请实现构造函数和 _read_stream。
实现构造函数#
调用超类构造函数并指定要读取的文件。可选地,指定有效的文件扩展名。Ray Data 会忽略其他扩展名的文件。
from ray.data.datasource import FileBasedDatasource
class ImageDatasource(FileBasedDatasource):
def __init__(self, paths: Union[str, List[str]], *, mode: str):
super().__init__(
paths,
file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
)
self.mode = mode # Specify read options in the constructor
实现 _read_stream#
_read_stream 是一个生成器,它从文件中产生一个或多个数据块。
Blocks 是 Data 内部用于表示行集合的抽象。它们可以是 PyArrow 表、pandas DataFrame 或 NumPy 数组的字典。
不要直接创建块。而是将数据行添加到 DelegatingBlockBuilder 中。
def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
import io
import numpy as np
from PIL import Image
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
data = f.readall()
image = Image.open(io.BytesIO(data))
image = image.convert(self.mode)
# Each block contains one row
builder = DelegatingBlockBuilder()
array = np.asarray(image)
item = {"image": array}
builder.add(item)
yield builder.build()
读取您的数据#
在实现了 ImageDatasource 后,调用 read_datasource() 将图像读取到 Dataset 中。Ray Data 会并行读取您的文件。
import ray
ds = ray.data.read_datasource(
ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)
写入文件数据#
注意
写入接口正在积极开发中,未来可能会发生变化。如果您有功能请求,请 打开一个 GitHub Issue。
将数据写入文件的核心抽象是 RowBasedFileDatasink 和 BlockBasedFileDatasink。它们在 Datasink 接口之上提供了文件特定的功能。
如果您想每行写入一个文件,请继承 RowBasedFileDatasink。否则,请继承 BlockBasedFileDatasink。
在此示例中,您将每张图像写入一个文件,因此您将继承 RowBasedFileDatasink。要继承 RowBasedFileDatasink,请实现构造函数和 write_row_to_file()。
实现构造函数#
调用超类构造函数并指定要写入的文件夹。可选地,指定一个代表文件格式的字符串(例如,"png")。Ray Data 使用文件格式作为文件扩展名。
from ray.data.datasource import RowBasedFileDatasink
class ImageDatasink(RowBasedFileDatasink):
def __init__(self, path: str, column: str, file_format: str):
super().__init__(path, file_format=file_format)
self.column = column
self.file_format = file_format # Specify write options in the constructor
实现 write_row_to_file#
write_row_to_file 将一行数据写入文件。每一行都是一个字典,将列名映射到值。
def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
import io
from PIL import Image
# PIL can't write to a NativeFile, so we have to write to a buffer first.
image = Image.fromarray(row[self.column])
buffer = io.BytesIO()
image.save(buffer, format=self.file_format)
file.write(buffer.getvalue())
写入您的数据#
在实现了 ImageDatasink 后,调用 write_datasink() 将图像写入文件。Ray Data 会并行写入多个文件。
ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))