工作流元数据#

警告

实验性的 Ray Workflows 库已被废弃,并将在未来的 Ray 版本中移除。

可观测性对工作流很重要——有时我们不仅想获取输出,还想深入了解内部状态(例如,测量性能或查找瓶颈)。工作流元数据提供了几个统计数据,有助于理解工作流,从基本的运行状态和任务选项到性能和用户添加的元数据。

获取元数据#

工作流元数据可以使用 workflow.get_metadata(workflow_id) 获取。例如

import ray
from ray import workflow

@ray.remote
def add(left: int, right: int) -> int:
    return left + right

workflow.run(add.bind(10, 20), workflow_id="add_example")

workflow_metadata = workflow.get_metadata("add_example")

assert workflow_metadata["status"] == "SUCCESSFUL"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

你也可以通过提供任务名称来获取单个工作流任务的元数据

workflow.run(
    add.options(
        **workflow.options(task_id="add_task")
    ).bind(10, 20), workflow_id="add_example_2")

task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")

assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

用户自定义元数据#

用户可以将自定义元数据添加到工作流或工作流任务中,这在你想为工作流或工作流任务附加一些额外信息时非常有用。

  • 工作流级别的元数据可以通过 .run(metadata=metadata) 添加

  • 任务级别的元数据可以通过 .options(**workflow.options(metadata=metadata)) 或在装饰器 @workflow.options(metadata=metadata) 中添加

workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
    workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})

assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}

注意:用户自定义元数据必须是一个 Python 字典,其值必须是 JSON 可序列化的。

可用指标#

工作流级别

  • status: 工作流状态,可以是 RUNNING (运行中), FAILED (失败), RESUMABLE (可恢复), CANCELED (已取消), 或 SUCCESSFUL (成功) 之一。

  • user_metadata: 用户通过 workflow.run() 添加的自定义元数据的 Python 字典。

  • stats: 工作流运行统计信息,包括工作流开始时间和结束时间。

任务级别

  • name: 任务名称,可以是用户通过 task.options(**workflow.options(task_id=xxx)) 提供的,或由系统生成的。

  • task_options: 任务选项,可以是用户通过 task.options() 提供的,或系统默认的。

  • user_metadata: 用户通过 task.options() 添加的自定义元数据的 Python 字典。

  • stats: 任务运行统计信息,包括任务开始时间和结束时间。

注意事项#

1. 与 get_output() 不同,get_metadata() 在调用时立即返回结果,这也意味着如果相应的元数据尚未可用,结果中并非所有字段都会存在(例如,metadata["stats"]["end_time"] 在工作流完成前不会可用)。

import time

@ray.remote
def simple():
    time.sleep(1000)
    return 0

workflow.run_async(simple.bind(), workflow_id="workflow_id")

# make sure workflow task starts running
time.sleep(2)

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "RUNNING"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

workflow.cancel("workflow_id")

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "CANCELED"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

2. 对于已恢复的工作流,当前行为是每当工作流恢复时,“stats”都会更新。

from pathlib import Path

workflow_id = "simple"

error_flag = Path("error")
error_flag.touch()

@ray.remote
def simple():
    if error_flag.exists():
        raise ValueError()
    return 0

try:
    workflow.run(simple.bind(), workflow_id=workflow_id)
except ray.exceptions.RayTaskError:
    pass

workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"

# remove flag to make task success
error_flag.unlink()
ref = workflow.resume_async(workflow_id)
assert ray.get(ref) == 0

workflow_metadata_resumed = workflow.get_metadata(workflow_id)
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"

# make sure resume updated running metrics
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]