容错性#
Ray 是一个分布式系统,这意味着故障随时可能发生。通常,Ray 将故障分为两类:1. 应用级别故障 2. 系统级别故障。用户代码中的错误或外部系统故障会触发应用级别故障。节点故障、网络故障或 Ray 自身的 bug 会触发系统级别故障。以下部分包含 Ray 为允许应用从故障中恢复而提供的机制。
为了处理应用级别故障,Ray 提供了捕获错误、重试失败代码和处理异常代码的机制。有关这些机制的更多信息,请参阅任务和Actor容错页面。
Ray 还提供了多种机制来自动从内部系统级别故障(例如节点故障)中恢复。特别是,Ray 可以自动从分布式对象存储中的某些故障中恢复。
如何编写容错的 Ray 应用#
有几个建议可以帮助 Ray 应用实现容错性
首先,如果 Ray 提供的容错机制不适合你,你总是可以捕获由故障引起的异常并手动恢复。
@ray.remote
class Actor:
def read_only(self):
import sys
import random
rand = random.random()
if rand < 0.2:
return 2 / 0
elif rand < 0.3:
sys.exit(1)
return 2
actor = Actor.remote()
# Manually retry the actor task.
while True:
try:
print(ray.get(actor.read_only.remote()))
break
except ZeroDivisionError:
pass
except ray.exceptions.RayActorError:
# Manually restart the actor
actor = Actor.remote()
其次,避免让 ObjectRef
的生命周期长于其所有者任务或 Actor(即通过调用 ray.put()
或 foo.remote()
创建初始 ObjectRef
的任务或 Actor)。只要对象仍有引用存在,其所有者 worker 即使在相应的任务或 Actor 完成后仍会继续运行。如果所有者 worker 发生故障,Ray 无法自动恢复该对象以供尝试访问该对象的用户使用。创建此类生命周期过长对象的一个示例是从任务返回 ray.put()
创建的 ObjectRef
import ray
# Non-fault tolerant version:
@ray.remote
def a():
x_ref = ray.put(1)
return x_ref
x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
# If owner of x (i.e. the worker process running task A) dies,
# the application can no longer get value of x.
print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
pass
在前面的示例中,对象 x
的生命周期长于其所有者任务 a
。如果运行任务 a
的 worker 进程失败,之后调用 ray.get
获取 x_ref
将导致 OwnerDiedError
异常。
以下示例是容错版本,它直接返回 x
。在此示例中,driver 拥有 x
,并且你仅在 driver 的生命周期内访问它。如果 x
丢失,Ray 可以通过血缘重建自动恢复它。更多详情请参阅反模式:从任务返回 ray.put() ObjectRef 会损害性能和容错性。
# Fault tolerant version:
@ray.remote
def a():
# Here we return the value directly instead of calling ray.put() first.
return 1
# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))
第三,避免使用只有特定节点才能满足的自定义资源需求。如果该特定节点发生故障,Ray 将不会重试正在运行的任务或 Actor。
@ray.remote
def b():
return 1
# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()
如果你倾向于在特定节点上运行任务,可以使用 NodeAffinitySchedulingStrategy
。它允许你将亲和性指定为软约束,这样即使目标节点发生故障,任务仍可以在其他节点上重试。
# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(), soft=True
)
).remote()