如何编写测试#
注意
免责声明:软件工程中没有硬性规定。在应用这些准则时,请自行判断。
易碎或脆弱的测试(即当假设发生变化时就会中断的测试)会拖慢开发速度。没有人愿意因为一个与他们更改无关的原因导致测试失败而卡在 PR 上。
本指南收集了一些实践方法,以帮助您编写能够支持 Ray Data 项目而不是拖慢它的测试。
通用最佳实践#
优先选择单元测试而非集成测试#
单元测试能提供更快的反馈,并且更容易定位失败原因。它们运行速度以毫秒计,而不是秒,并且不依赖于 Ray 集群、外部系统或时序。这使得测试套件保持快速、可靠且易于维护。
注意
将单元测试放在 python/ray/data/tests/unit 目录下。
使用 fixtures,避免 try-finally#
Fixtures 使测试更简洁、更可重用且隔离性更好。它们是用于设置和清理的理想工具,尤其适用于 monkeypatch 等场景。
try-finally 有效,但 fixtures 更能清晰地表达意图并避免样板代码。
原始代码
def test_dynamic_block_split(ray_start_regular_shared):
ctx = ray.data.context.DataContext.get_current()
original_target_max_block_size = ctx.target_max_block_size
ctx.target_max_block_size = 1
try:
...
finally:
ctx.target_max_block_size = original_target_max_block_size
改进后
def test_dynamic_block_split(ray_start_regular_shared, restore_data_context):
ctx = ray.data.context.DataContext.get_current()
target_max_block_size = ctx.target_max_block_size
... # No need for try-finally
Ray 特有实践#
不要假定 Datasets 会以特定顺序产生输出#
除非您在 DataContext 中设置了 preserve_order=True,否则 Ray Data 不保证输出顺序。如果您的测试在未明确要求的情况下依赖于顺序,那么您将面临脆弱的失败。
原始代码
ds_dfs = []
for path in os.listdir(out_path):
assert path.startswith("data_") and path.endswith(".parquet")
ds_dfs.append(pd.read_parquet(os.path.join(out_path, path)))
ds_df = pd.concat(ds_dfs).reset_index(drop=True)
df = pd.concat([df1, df2]).reset_index(drop=True)
assert ds_df.equals(df)
改进后
from ray.data._internal.util import rows_same
actual_data = pd.read_parquet(out_path)
expected_data = pd.concat([df1, df2]
assert rows_same(actual_data, expected_data)
提示
使用 ray.data._internal.util.rows_same 工具函数来比较 pandas DataFrames 的相等性,同时忽略索引和顺序。
避免针对 repr 输出进行测试来验证特定数据#
repr 输出不是任何接口合同的一部分——它可能随时更改。此外,断言 repr 的测试通常会隐藏真实意图:您是想检查数据,还是仅仅检查它恰好如何打印?明确您关心的是什么。
原始代码
assert str(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds
改进后
assert ds.schema() == Schema(pa.schema({"one": pa.int64(), "two": pa.string()}))
assert ds.count() == 6
避免对块的数量或大小做出假设#
除非您正在测试 repartition 这样的 API,否则不要将您的测试锁定在特定的块数量或大小上。这两者都可能根据实现或集群配置而变化——而且这通常是没问题的。
原始代码
ds = ray.data.read_parquet(paths + [txt_path], filesystem=fs)
assert ds._plan.initial_num_blocks() == 2 # Where does 2 come from?
assert rows_same(ds.to_pandas(), expected_data)
改进后
ds = ray.data.read_parquet(paths + [txt_path], filesystem=fs)
# Assertion about number of blocks has been removed.
assert rows_same(ds.to_pandas(), expected_data)
原始代码
ds2 = ds.repartition(5)
assert ds2._plan.initial_num_blocks() == 5
assert ds2._block_num_rows() == [10, 10, 0, 0, 0] # Magic numbers?
改进后
ds2 = ds.repartition(5)
assert sum(len(bundle.blocks) for bundle in ds.iter_internal_ref_bundles()) == 5
# Assertion about the number of rows in each block has been removed.
避免测试 DAG 的外观是否符合特定方式#
随着实现的不断演进,执行计划中的算子可能会随时间发生变化。除非您专门在测试优化规则或在算子级别工作,否则测试不应期望特定的 DAG 结构。
原始代码
# Check that metadata fetch is included in stats.
assert "FromArrow" in ds.stats()
# Underlying implementation uses `FromArrow` operator
assert ds._plan._logical_plan.dag.name == "FromArrow"
改进后
# (Assertions removed).