工作流元数据#
警告
实验性的 Ray Workflows 库已被废弃,并将在未来的 Ray 版本中移除。
可观测性对工作流很重要——有时我们不仅想获取输出,还想深入了解内部状态(例如,测量性能或查找瓶颈)。工作流元数据提供了几个统计数据,有助于理解工作流,从基本的运行状态和任务选项到性能和用户添加的元数据。
获取元数据#
工作流元数据可以使用 workflow.get_metadata(workflow_id)
获取。例如
import ray
from ray import workflow
@ray.remote
def add(left: int, right: int) -> int:
return left + right
workflow.run(add.bind(10, 20), workflow_id="add_example")
workflow_metadata = workflow.get_metadata("add_example")
assert workflow_metadata["status"] == "SUCCESSFUL"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]
你也可以通过提供任务名称来获取单个工作流任务的元数据
workflow.run(
add.options(
**workflow.options(task_id="add_task")
).bind(10, 20), workflow_id="add_example_2")
task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]
用户自定义元数据#
用户可以将自定义元数据添加到工作流或工作流任务中,这在你想为工作流或工作流任务附加一些额外信息时非常有用。
工作流级别的元数据可以通过
.run(metadata=metadata)
添加任务级别的元数据可以通过
.options(**workflow.options(metadata=metadata))
或在装饰器@workflow.options(metadata=metadata)
中添加
workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}
注意:用户自定义元数据必须是一个 Python 字典,其值必须是 JSON 可序列化的。
可用指标#
工作流级别
status: 工作流状态,可以是 RUNNING (运行中), FAILED (失败), RESUMABLE (可恢复), CANCELED (已取消), 或 SUCCESSFUL (成功) 之一。
user_metadata: 用户通过
workflow.run()
添加的自定义元数据的 Python 字典。stats: 工作流运行统计信息,包括工作流开始时间和结束时间。
任务级别
name: 任务名称,可以是用户通过
task.options(**workflow.options(task_id=xxx))
提供的,或由系统生成的。task_options: 任务选项,可以是用户通过
task.options()
提供的,或系统默认的。user_metadata: 用户通过
task.options()
添加的自定义元数据的 Python 字典。stats: 任务运行统计信息,包括任务开始时间和结束时间。
注意事项#
1. 与 get_output()
不同,get_metadata()
在调用时立即返回结果,这也意味着如果相应的元数据尚未可用,结果中并非所有字段都会存在(例如,metadata["stats"]["end_time"]
在工作流完成前不会可用)。
import time
@ray.remote
def simple():
time.sleep(1000)
return 0
workflow.run_async(simple.bind(), workflow_id="workflow_id")
# make sure workflow task starts running
time.sleep(2)
workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "RUNNING"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]
workflow.cancel("workflow_id")
workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "CANCELED"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]
2. 对于已恢复的工作流,当前行为是每当工作流恢复时,“stats”都会更新。
from pathlib import Path
workflow_id = "simple"
error_flag = Path("error")
error_flag.touch()
@ray.remote
def simple():
if error_flag.exists():
raise ValueError()
return 0
try:
workflow.run(simple.bind(), workflow_id=workflow_id)
except ray.exceptions.RayTaskError:
pass
workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"
# remove flag to make task success
error_flag.unlink()
ref = workflow.resume_async(workflow_id)
assert ray.get(ref) == 0
workflow_metadata_resumed = workflow.get_metadata(workflow_id)
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
# make sure resume updated running metrics
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]