关键概念#
警告
实验性的 Ray Workflows 库已被弃用,并将在未来版本的 Ray 中移除。
注意
Workflows 是一个为 Ray 任务图提供强大持久性的库。如果你是 Ray 的新手,我们建议先从核心演练开始。
DAG API#
通常,Ray 任务是急切执行的。为了提供持久性,Ray Workflows 使用惰性 Ray DAG API 来分离任务 DAG 的定义和执行。
从 Ray 任务切换到 DAG API 很简单:只需将所有 .remote(...)
调用(返回对象引用)替换为 .bind(...)
调用(返回 DAG 节点)。Ray DAG 节点可以像普通 Ray 任务一样组合。
然而,与 Ray 任务不同,不允许在 DAG 节点上调用 ray.get()
或 ray.wait()
。相反,需要对 DAG 进行 执行 才能计算结果。
将函数组合成 DAG
import ray
@ray.remote
def one() -> int:
return 1
@ray.remote
def add(a: int, b: int) -> int:
return a + b
dag = add.bind(100, one.bind())
工作流执行#
要使用工作流执行 DAG,请使用 workflow.run
from ray import workflow
# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101
# Or you can run it asynchronously and fetch the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101
一旦启动,工作流的执行会持久化记录到存储中。系统发生故障时,工作流可以在任何有权访问该存储的 Ray 集群上恢复。
执行工作流 DAG 时,工作流任务会在失败时重试,但一旦成功完成并且结果由工作流引擎持久化,它们将不再运行。
获取工作流结果
# configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
# A default temporary storage is used by by the workflow if starting without
# Ray init.
ray.init(storage="/tmp/data")
assert workflow.run(dag, workflow_id="run_1") == 101
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("run_1") == 101
# workflow.get_output_async returns an ObjectRef.
assert ray.get(workflow.get_output_async("run_1")) == 101
对象#
Workflows 与 Ray 对象无缝集成,允许 Ray 对象引用作为任务的输入和输出。对象在首次从任务返回时会创建检查点。创建检查点后,该对象可以通过 Ray 对象存储以内存速度在任意数量的工作流任务之间共享。
在工作流中使用 Ray 对象
import ray
from typing import List
@ray.remote
def hello():
return "hello"
@ray.remote
def words() -> List[ray.ObjectRef]:
# NOTE: Here it is ".remote()" instead of ".bind()", so
# it creates an ObjectRef instead of a DAG.
return [hello.remote(), ray.put("world")]
@ray.remote
def concat(words: List[ray.ObjectRef]) -> str:
return " ".join([ray.get(w) for w in words])
assert workflow.run(concat.bind(words.bind())) == "hello world"
动态工作流#
工作流可以在运行时生成新任务。这是通过返回 DAG 的一个延续来实现的。延续是由函数返回并在函数返回后执行的东西。延续特性使工作流能够实现嵌套、循环和递归。
斐波那契递归工作流
@ray.remote
def add(a: int, b: int) -> int:
return a + b
@ray.remote
def fib(n: int) -> int:
if n <= 1:
return n
# return a continuation of a DAG
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
assert workflow.run(fib.bind(10)) == 55
事件#
事件是发送到工作流的外部信号。工作流可以使用事件系统通过定时器或外部事件高效地触发。
import time
# Sleep is a special type of event.
sleep_task = workflow.sleep(1)
# `wait_for_events` allows for pluggable event listeners.
event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)
@ray.remote
def gather(*args):
return args
# If a task's arguments include events, the task won't be executed until all
# of the events have occurred.
workflow.run(gather.bind(sleep_task, event_task, "hello world"))