如何编写测试#

注意

免责声明:软件工程中没有硬性规定。在应用这些准则时,请自行判断。

易碎或脆弱的测试(即当假设发生变化时就会中断的测试)会拖慢开发速度。没有人愿意因为一个与他们更改无关的原因导致测试失败而卡在 PR 上。

本指南收集了一些实践方法,以帮助您编写能够支持 Ray Data 项目而不是拖慢它的测试。

通用最佳实践#

优先选择单元测试而非集成测试#

单元测试能提供更快的反馈,并且更容易定位失败原因。它们运行速度以毫秒计,而不是秒,并且不依赖于 Ray 集群、外部系统或时序。这使得测试套件保持快速、可靠且易于维护。

注意

将单元测试放在 python/ray/data/tests/unit 目录下。

使用 fixtures,避免 try-finally#

Fixtures 使测试更简洁、更可重用且隔离性更好。它们是用于设置和清理的理想工具,尤其适用于 monkeypatch 等场景。

try-finally 有效,但 fixtures 更能清晰地表达意图并避免样板代码。

原始代码

def test_dynamic_block_split(ray_start_regular_shared):
    ctx = ray.data.context.DataContext.get_current()
    original_target_max_block_size = ctx.target_max_block_size

    ctx.target_max_block_size = 1
    try: 
        ...
    finally:
        ctx.target_max_block_size = original_target_max_block_size

改进后

def test_dynamic_block_split(ray_start_regular_shared, restore_data_context):
    ctx = ray.data.context.DataContext.get_current()
    target_max_block_size = ctx.target_max_block_size
    ... # No need for try-finally

Ray 特有实践#

不要假定 Datasets 会以特定顺序产生输出#

除非您在 DataContext 中设置了 preserve_order=True,否则 Ray Data 不保证输出顺序。如果您的测试在未明确要求的情况下依赖于顺序,那么您将面临脆弱的失败。

原始代码

ds_dfs = []
for path in os.listdir(out_path):
    assert path.startswith("data_") and path.endswith(".parquet")
    ds_dfs.append(pd.read_parquet(os.path.join(out_path, path)))

ds_df = pd.concat(ds_dfs).reset_index(drop=True)
df = pd.concat([df1, df2]).reset_index(drop=True)
assert ds_df.equals(df)

改进后

from ray.data._internal.util import rows_same

actual_data = pd.read_parquet(out_path)
expected_data = pd.concat([df1, df2]
assert rows_same(actual_data, expected_data)

提示

使用 ray.data._internal.util.rows_same 工具函数来比较 pandas DataFrames 的相等性,同时忽略索引和顺序。

优先选择共享集群 fixtures#

优先选择共享集群 fixtures,如 ray_start_regular_shared,而不是隔离集群 fixtures,如 shutdown_onlyray_start_regular

shutdown_onlyray_start_regular 会在每个测试完成后重新启动 Ray 集群。启动和停止 Ray 可能需要超过一秒钟——这听起来很短,但累积到成千上万个测试(加上参数化)时,会迅速增加。

只有当您的测试确实需要一个全新的集群时,才使用隔离集群。

注意

这里存在隔离性和速度之间的固有权衡。在此特定情况下,请选择优先考虑速度。

原始代码

@pytest.mark.parametrize("concurrency", [-1, 1.5], ids=["negative", "float"])
def test_invalid_concurrency_raises(shutdown_only, concurrency):
    ds = ray.data.range(1)  # Each parametrization restarts the Ray cluster!
    with pytest.raises(ValueError):
        ds.map(lambda row: row, concurrency=concurrency)

改进后

@pytest.mark.parametrize("concurrency", [-1, 1.5], ids=["negative", "float"])
def test_invalid_concurrency_raises(ray_start_regular_shared, concurrency):
    ds = ray.data.range(1)  # Each parametrization reuses the same Ray cluster.
    with pytest.raises(ValueError):
        ds.map(lambda row: row, concurrency=concurrency)

避免针对 repr 输出进行测试来验证特定数据#

repr 输出不是任何接口合同的一部分——它可能随时更改。此外,断言 repr 的测试通常会隐藏真实意图:您是想检查数据,还是仅仅检查它恰好如何打印?明确您关心的是什么。

原始代码

assert str(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds

改进后

assert ds.schema() == Schema(pa.schema({"one": pa.int64(), "two": pa.string()}))
assert ds.count() == 6

避免对块的数量或大小做出假设#

除非您正在测试 repartition 这样的 API,否则不要将您的测试锁定在特定的块数量或大小上。这两者都可能根据实现或集群配置而变化——而且这通常是没问题的。

原始代码

ds = ray.data.read_parquet(paths + [txt_path], filesystem=fs)
assert ds._plan.initial_num_blocks() == 2  # Where does 2 come from?
assert rows_same(ds.to_pandas(), expected_data)

改进后

ds = ray.data.read_parquet(paths + [txt_path], filesystem=fs)
# Assertion about number of blocks has been removed.
assert rows_same(ds.to_pandas(), expected_data)

原始代码

ds2 = ds.repartition(5)
assert ds2._plan.initial_num_blocks() == 5
assert ds2._block_num_rows() == [10, 10, 0, 0, 0]  # Magic numbers?

改进后

ds2 = ds.repartition(5)
assert sum(len(bundle.blocks) for bundle in ds.iter_internal_ref_bundles()) == 5
# Assertion about the number of rows in each block has been removed.

避免测试 DAG 的外观是否符合特定方式#

随着实现的不断演进,执行计划中的算子可能会随时间发生变化。除非您专门在测试优化规则或在算子级别工作,否则测试不应期望特定的 DAG 结构。

原始代码

# Check that metadata fetch is included in stats.
assert "FromArrow" in ds.stats()
# Underlying implementation uses `FromArrow` operator
assert ds._plan._logical_plan.dag.name == "FromArrow"

改进后

# (Assertions removed).