检查数据#

检查 Datasets 以更好地了解您的数据。

本指南将向您展示如何

描述数据集#

Datasets 是表格形式的。要查看数据集的列名和类型,请调用 Dataset.schema()

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

print(ds.schema())
Column             Type
------             ----
sepal length (cm)  double
sepal width (cm)   double
petal length (cm)  double
petal width (cm)   double
target             int64

有关更多信息,例如行数,请打印 Dataset。

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

print(ds)
Dataset(num_rows=..., schema=...)

检查行#

要获取行列表,请调用 Dataset.take()Dataset.take_all()。Ray Data 将每一行表示为一个字典。

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

rows = ds.take(1)
print(rows)
[{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}]

有关处理行的更多信息,请参阅 转换行迭代行

检查批次#

一个批次包含来自多行的数据。要检查批次,请调用 Dataset.take_batch() <ray.data.Dataset.take_batch>

默认情况下,Ray Data 将批次表示为 NumPy ndarrays 的字典。要更改返回批次的类型,请设置 batch_format。批次格式与 Ray Data 存储底层块的方式无关,因此您可以选择任何批次格式,而不管内部块表示形式如何。

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

batch = ds.take_batch(batch_size=2, batch_format="numpy")
print("Batch:", batch)
print("Image shape", batch["image"].shape)
Batch: {'image': array([[[[...]]]], dtype=uint8)}
Image shape: (2, 32, 32, 3)
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

batch = ds.take_batch(batch_size=2, batch_format="pandas")
print(batch)
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  target
0                5.1               3.5  ...               0.2       0
1                4.9               3.0  ...               0.2       0
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

batch = ds.take_batch(batch_size=2, batch_format="pyarrow")
print(batch)
pyarrow.Table
sepal length (cm): double
sepal width (cm): double
petal length (cm): double
petal width (cm): double
target: int64
----
sepal length (cm): [[5.1,4.9]]
sepal width (cm): [[3.5,3]]
petal length (cm): [[1.4,1.4]]
petal width (cm): [[0.2,0.2]]
target: [[0,0]]

有关处理批次的更多信息,请参阅 转换批次迭代批次

检查执行统计信息#

Ray Data 在执行过程中会为每个算子计算统计信息,例如挂钟时间和内存使用情况。

要查看有关您的 Datasets 的统计信息,请在已执行的数据集上调用 Dataset.stats()。统计信息也会保存在 /tmp/ray/session_*/logs/ray-data/ray-data.log 下。有关如何读取此输出的更多信息,请参阅 使用 Ray Data Dashboard 监控您的工作负载

import ray
from huggingface_hub import HfFileSystem

def f(batch):
    return batch

def g(row):
    return True

path = "hf://datasets/ylecun/mnist/mnist/"

fs = HfFileSystem()
train_files = [f["name"] for f in fs.ls(path) if "train" in f["name"] and f["name"].endswith(".parquet")]
ds = (
    ray.data.read_parquet(train_files, filesystem=fs)
    .map_batches(f)
    .filter(g)
    .materialize()
)

print(ds.stats())
Operator 1 ReadParquet->SplitBlocks(32): 1 tasks executed, 32 blocks produced in 2.92s
* Remote wall time: 103.38us min, 1.34s max, 42.14ms mean, 1.35s total
* Remote cpu time: 102.0us min, 164.66ms max, 5.37ms mean, 171.72ms total
* UDF time: 0us min, 0us max, 0.0us mean, 0us total
* Peak heap memory usage (MiB): 266375.0 min, 281875.0 max, 274491 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 537986 min, 555360 max, 545963 mean, 17470820 total
* Output rows per task: 60000 min, 60000 max, 60000 mean, 1 tasks used
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used
* Operator throughput:
    * Ray Data throughput: 20579.80984833993 rows/s
    * Estimated single node throughput: 44492.67361278733 rows/s

Operator 2 MapBatches(f)->Filter(g): 32 tasks executed, 32 blocks produced in 3.63s
* Remote wall time: 675.48ms min, 1.0s max, 797.07ms mean, 25.51s total
* Remote cpu time: 673.41ms min, 897.32ms max, 768.09ms mean, 24.58s total
* UDF time: 661.65ms min, 978.04ms max, 778.13ms mean, 24.9s total
* Peak heap memory usage (MiB): 152281.25 min, 286796.88 max, 164231 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 530251 min, 547625 max, 538228 mean, 17223300 total
* Output rows per task: 1875 min, 1875 max, 1875 mean, 32 tasks used
* Tasks per node: 32 min, 32 max, 32 mean; 1 nodes used
* Operator throughput:
    * Ray Data throughput: 16512.364546087643 rows/s
    * Estimated single node throughput: 2352.3683708977856 rows/s

Dataset throughput:
    * Ray Data throughput: 11463.372316361854 rows/s
    * Estimated single node throughput: 25580.963670075285 rows/s