关键概念#

警告

实验性的 Ray Workflows 库已被弃用,并将在未来版本的 Ray 中移除。

注意

Workflows 是一个为 Ray 任务图提供强大持久性的库。如果你是 Ray 的新手,我们建议先从核心演练开始。

DAG API#

通常,Ray 任务是急切执行的。为了提供持久性,Ray Workflows 使用惰性 Ray DAG API 来分离任务 DAG 的定义和执行。

从 Ray 任务切换到 DAG API 很简单:只需将所有 .remote(...) 调用(返回对象引用)替换为 .bind(...) 调用(返回 DAG 节点)。Ray DAG 节点可以像普通 Ray 任务一样组合。

然而,与 Ray 任务不同,不允许在 DAG 节点上调用 ray.get()ray.wait()。相反,需要对 DAG 进行 执行 才能计算结果。

将函数组合成 DAG

import ray

@ray.remote
def one() -> int:
    return 1

@ray.remote
def add(a: int, b: int) -> int:
    return a + b

dag = add.bind(100, one.bind())

工作流执行#

要使用工作流执行 DAG,请使用 workflow.run

from ray import workflow

# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101

# Or you can run it asynchronously and fetch the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101

一旦启动,工作流的执行会持久化记录到存储中。系统发生故障时,工作流可以在任何有权访问该存储的 Ray 集群上恢复。

执行工作流 DAG 时,工作流任务会在失败时重试,但一旦成功完成并且结果由工作流引擎持久化,它们将不再运行。

获取工作流结果

# configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
# A default temporary storage is used by by the workflow if starting without
# Ray init.
ray.init(storage="/tmp/data")
assert workflow.run(dag, workflow_id="run_1") == 101
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("run_1") == 101
# workflow.get_output_async returns an ObjectRef.
assert ray.get(workflow.get_output_async("run_1")) == 101

对象#

Workflows 与 Ray 对象无缝集成,允许 Ray 对象引用作为任务的输入和输出。对象在首次从任务返回时会创建检查点。创建检查点后,该对象可以通过 Ray 对象存储以内存速度在任意数量的工作流任务之间共享。

在工作流中使用 Ray 对象

import ray
from typing import List

@ray.remote
def hello():
    return "hello"

@ray.remote
def words() -> List[ray.ObjectRef]:
    # NOTE: Here it is ".remote()" instead of ".bind()", so
    # it creates an ObjectRef instead of a DAG.
    return [hello.remote(), ray.put("world")]

@ray.remote
def concat(words: List[ray.ObjectRef]) -> str:
    return " ".join([ray.get(w) for w in words])

assert workflow.run(concat.bind(words.bind())) == "hello world"

动态工作流#

工作流可以在运行时生成新任务。这是通过返回 DAG 的一个延续来实现的。延续是由函数返回并在函数返回后执行的东西。延续特性使工作流能够实现嵌套、循环和递归。

斐波那契递归工作流

@ray.remote
def add(a: int, b: int) -> int:
    return a + b

@ray.remote
def fib(n: int) -> int:
    if n <= 1:
        return n
    # return a continuation of a DAG
    return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))

assert workflow.run(fib.bind(10)) == 55

事件#

事件是发送到工作流的外部信号。工作流可以使用事件系统通过定时器或外部事件高效地触发。

import time

# Sleep is a special type of event.
sleep_task = workflow.sleep(1)

# `wait_for_events` allows for pluggable event listeners.
event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)

@ray.remote
def gather(*args):
    return args

# If a task's arguments include events, the task won't be executed until all
# of the events have occurred.
workflow.run(gather.bind(sleep_task, event_task, "hello world"))