工作流管理#

警告

实验性的 Ray Workflows 库已弃用,并将在 Ray 的未来版本中移除。

工作流 ID#

每个工作流都有一个唯一的 workflow_id。默认情况下,当您调用 .run().run_async() 时,会生成一个随机 ID。建议您通过 .run(workflow_id="id") 显式为每个工作流分配一个 ID。

如果使用先前创建的工作流 ID 调用 .run(),工作流将从上次执行的地方恢复。

工作流状态#

工作流可以处于以下几种状态之一

状态

描述

RUNNING

工作流当前正在集群中运行。

PENDING

工作流已排队等待执行。

FAILED

此工作流因应用程序错误而失败。可以从失败的任务恢复。

RESUMABLE

此工作流因系统错误而失败。可以从失败的任务恢复。

CANCELED

工作流已被取消。其结果不可用,且无法恢复。

SUCCESSFUL

工作流已成功执行。

单一工作流管理 API#

import ray
from ray import workflow

@ray.remote
def task():
    return 3

workflow.run(task.bind(), workflow_id="workflow_id")

# Get the status of a workflow.
try:
    status = workflow.get_status(workflow_id="workflow_id")
    assert status in {
        "RUNNING", "RESUMABLE", "FAILED",
        "CANCELED", "SUCCESSFUL"}
except workflow.exceptions.WorkflowNotFoundError:
    print("Workflow doesn't exist.")

# Resume a workflow.
print(workflow.resume(workflow_id="workflow_id"))
# return is an ObjectRef which is the result of this workflow

# Cancel a workflow.
workflow.cancel(workflow_id="workflow_id")

# Delete the workflow.
workflow.delete(workflow_id="workflow_id")
3

批量工作流管理 API#

# List all running workflows.
print(workflow.list_all("RUNNING"))

# List RUNNING and CANCELED workflows.
print(workflow.list_all({"RUNNING", "CANCELED"}))

# List all workflows.
print(workflow.list_all())

# Resume all resumable workflows. This won't include failed workflow
print(workflow.resume_all())

# To resume workflows including failed ones, use `include_failed=True`
print(workflow.resume_all(include_failed=True))
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)]
[("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)]

定期工作流#

Ray Workflows 目前没有内置的作业调度器。但是,您可以轻松使用任何外部作业调度器与您的 Ray 集群交互(通过作业提交)来触发工作流运行。

存储配置#

Ray Workflows 开箱即支持多种类型的存储后端,包括

  • 本地文件系统:数据存储在本地。此选项仅适用于单节点测试,因为对于多节点集群,数据必须存储在共享文件系统(例如 NFS)上。要使用本地存储,请指定 ray.init(storage="/path/to/storage_dir")ray start --head --storage="/path/to/storage_dir"

  • S3:这是生产环境的常用选择,因为它提供可伸缩和持久的对象存储。使用 ray.init(storage="s3://bucket/path")ray start --head --storage="s3://bucket/path" 启用 S3 存储。

Ray 内部使用 pyarrow 作为存储引擎。有关 pyarrow 支持的完整存储选项列表,请参阅Pyarrow.fs.FileSystem 文档。

注意

如果您在使用 pyarrow 支持的存储选项时遇到问题,请确保您安装了正确版本的 pyarrow。例如,GCS (Google Cloud Storage) 文件系统仅在 pyarrow >= 9.0 中支持。

如果未指定,将使用 /tmp/ray/workflow_data 作为临时存储。此默认设置仅适用于单节点 Ray 集群

并发控制#

Ray Workflows 支持并发控制。您可以在执行任何工作流之前通过 workflow.init() 支持最大运行工作流数和最大挂起工作流数。再次使用不同配置调用 workflow.init() 会引发错误,除非给定 None

例如,workflow.init(max_running_workflows=10, max_pending_workflows=50) 意味着最多有 10 个工作流正在运行,50 个工作流挂起。在另一个驱动程序上使用不同的值调用将引发异常。如果它们被设置为 None,则将使用之前设置的值。

当挂起工作流数达到最大值时提交工作流会引发 queue.Full("Workflow queue has been full")。获取挂起工作流的输出将被阻塞,直到工作流稍后完成运行。

挂起的工作流状态为 PENDING。挂起的工作流在中断后(例如,集群故障)可以恢复。使用 workflow.resume_all() 恢复中断的运行和挂起工作流时,运行中的工作流具有比挂起工作流更高的优先级(即,挂起工作流可能仍然处于挂起状态)。

注意

Workflows 不保证恢复的工作流按照相同的顺序运行。