Actor 容错#

如果 actor 进程死亡,或者 actor 的所有者 (owner) 死亡,actor 可能会失败。Actor 的所有者是最初通过调用 ActorClass.remote() 创建该 actor 的 worker。分离式 actor (Detached actors) 没有所有者进程,并在 Ray 集群销毁时被清理。

Actor 进程失败#

Ray 可以自动重启意外崩溃的 actor。此行为由 max_restarts 控制,该参数设置 actor 将被重启的最大次数。 max_restarts 的默认值为 0,表示 actor 不会被重启。如果设置为 -1,actor 将会被无限次重启。actor 重启时,其状态将通过重新运行其构造函数来重新创建。达到指定的重启次数后,后续的 actor 方法将引发 RayActorError

默认情况下,actor 任务以至多执行一次 (at-most-once) 语义执行(在 @ray.remote 装饰器 中设置 max_task_retries=0)。这意味着如果提交的任务到达了一个不可达的 actor,Ray 将通过 RayActorError 报告错误,这是一个 Python 级别的异常,当对任务返回的 future 调用 ray.get 时会抛出。请注意,即使任务确实成功执行了,也可能抛出此异常。例如,如果 actor 在执行完任务后立即死亡,就会发生这种情况。

Ray 还为 actor 任务提供了至少执行一次 (at-least-once) 执行语义(max_task_retries=-1max_task_retries > 0)。这意味着如果提交的任务到达了一个不可达的 actor,系统会自动重试该任务。启用此选项后,系统仅在发生以下情况之一时向应用抛出 RayActorError:(1) actor 的 max_restarts 限制已超出,无法再重启 actor,或 (2) 此特定任务的 max_task_retries 限制已超出。请注意,如果 actor 在任务提交时正在重启,这将被计为一次重试。可以使用 max_task_retries = -1 将重试限制设置为无限。

您可以通过运行以下代码来体验此行为。

import os
import ray

ray.init()

# This actor kills itself after executing 10 tasks.
@ray.remote(max_restarts=4, max_task_retries=-1)
class Actor:
    def __init__(self):
        self.counter = 0

    def increment_and_possibly_fail(self):
        # Exit after every 10 tasks.
        if self.counter == 10:
            os._exit(0)
        self.counter += 1
        return self.counter

actor = Actor.remote()

# The actor will be reconstructed up to 4 times, so we can execute up to 50
# tasks successfully. The actor is reconstructed by rerunning its constructor.
# Methods that were executing when the actor died will be retried and will not
# raise a `RayActorError`. Retried methods may execute twice, once on the
# failed actor and a second time on the restarted actor.
for _ in range(50):
    counter = ray.get(actor.increment_and_possibly_fail.remote())
    print(counter)  # Prints the sequence 1-10 5 times.

# After the actor has been restarted 4 times, all subsequent methods will
# raise a `RayActorError`.
for _ in range(10):
    try:
        counter = ray.get(actor.increment_and_possibly_fail.remote())
        print(counter)  # Unreachable.
    except ray.exceptions.RayActorError:
        print("FAILURE")  # Prints 10 times.

对于至少执行一次 (at-least-once) 的 actor,系统仍会根据初始提交顺序保证执行顺序。例如,失败的 actor 任务之后提交的任何任务都不会在该 actor 上执行,直到失败的任务成功重试。系统不会尝试重新执行故障前已成功执行的任何任务(除非 max_task_retries 非零且任务需要用于对象重建)。

注意

对于异步或线程化 actor任务可能会乱序执行。Actor 重启时,系统仅会重试未完成的任务。先前已完成的任务不会被重新执行。

至少执行一次 (at-least-once) 执行最适合只读 actor 或状态易失且无需在失败后重建的 actor。对于具有关键状态的 actor,应用需要负责恢复状态,例如,通过定期进行检查点并在 actor 重启时从检查点恢复。

Actor 检查点#

max_restarts 会自动重启崩溃的 actor,但它不会自动恢复 actor 中的应用级别状态。您应该手动对 actor 的状态进行检查点,并在 actor 重启时恢复。

对于手动重启的 actor,actor 的创建者应管理检查点,并在失败时手动重启和恢复 actor。如果您希望创建者决定何时重启 actor 和/或创建者与其他执行协调 actor 检查点,建议采用此方法。

import os
import sys
import ray
import json
import tempfile
import shutil


@ray.remote(num_cpus=1)
class Worker:
    def __init__(self):
        self.state = {"num_tasks_executed": 0}

    def execute_task(self, crash=False):
        if crash:
            sys.exit(1)

        # Execute the task
        # ...

        # Update the internal state
        self.state["num_tasks_executed"] = self.state["num_tasks_executed"] + 1

    def checkpoint(self):
        return self.state

    def restore(self, state):
        self.state = state


class Controller:
    def __init__(self):
        self.worker = Worker.remote()
        self.worker_state = ray.get(self.worker.checkpoint.remote())

    def execute_task_with_fault_tolerance(self):
        i = 0
        while True:
            i = i + 1
            try:
                ray.get(self.worker.execute_task.remote(crash=(i % 2 == 1)))
                # Checkpoint the latest worker state
                self.worker_state = ray.get(self.worker.checkpoint.remote())
                return
            except ray.exceptions.RayActorError:
                print("Actor crashes, restarting...")
                # Restart the actor and restore the state
                self.worker = Worker.remote()
                ray.get(self.worker.restore.remote(self.worker_state))


controller = Controller()
controller.execute_task_with_fault_tolerance()
controller.execute_task_with_fault_tolerance()
assert ray.get(controller.worker.checkpoint.remote())["num_tasks_executed"] == 2

或者,如果您使用 Ray 的自动 actor 重启功能,actor 可以在构造函数中手动进行检查点并从检查点恢复。

@ray.remote(max_restarts=-1, max_task_retries=-1)
class ImmortalActor:
    def __init__(self, checkpoint_file):
        self.checkpoint_file = checkpoint_file

        if os.path.exists(self.checkpoint_file):
            # Restore from a checkpoint
            with open(self.checkpoint_file, "r") as f:
                self.state = json.load(f)
        else:
            self.state = {}

    def update(self, key, value):
        import random

        if random.randrange(10) < 5:
            sys.exit(1)

        self.state[key] = value

        # Checkpoint the latest state
        with open(self.checkpoint_file, "w") as f:
            json.dump(self.state, f)

    def get(self, key):
        return self.state[key]


checkpoint_dir = tempfile.mkdtemp()
actor = ImmortalActor.remote(os.path.join(checkpoint_dir, "checkpoint.json"))
ray.get(actor.update.remote("1", 1))
ray.get(actor.update.remote("2", 2))
assert ray.get(actor.get.remote("1")) == 1
shutil.rmtree(checkpoint_dir)

注意

如果检查点保存到外部存储,请确保整个集群都可以访问它,因为 actor 可以在不同的节点上重启。例如,将检查点保存到云存储(如 S3)或共享目录(如通过 NFS)。

Actor 创建者失败#

对于非分离式 actor (non-detached actors),actor 的所有者是创建它的 worker,即调用 ActorClass.remote() 的 worker。与对象类似,如果 actor 的所有者死亡,则 actor 也会与所有者“同生共死”。Ray 不会自动恢复所有者已死的 actor,即使其 max_restarts 非零。

由于分离式 actor (detached actors) 没有所有者,即使它们的原始创建者死亡,它们仍会由 Ray 重启。分离式 actor 会继续自动重启,直到超过最大重启次数、actor 被销毁或 Ray 集群被销毁。

您可以在以下代码中试用此行为。

import ray
import os
import signal
ray.init()

@ray.remote(max_restarts=-1)
class Actor:
    def ping(self):
        return "hello"

@ray.remote
class Parent:
    def generate_actors(self):
        self.child = Actor.remote()
        self.detached_actor = Actor.options(name="actor", lifetime="detached").remote()
        return self.child, self.detached_actor, os.getpid()

parent = Parent.remote()
actor, detached_actor, pid = ray.get(parent.generate_actors.remote())

os.kill(pid, signal.SIGKILL)

try:
    print("actor.ping:", ray.get(actor.ping.remote()))
except ray.exceptions.RayActorError as e:
    print("Failed to submit actor call", e)
# Failed to submit actor call The actor died unexpectedly before finishing this task.
# 	class_name: Actor
# 	actor_id: 56f541b178ff78470f79c3b601000000
# 	namespace: ea8b3596-7426-4aa8-98cc-9f77161c4d5f
# The actor is dead because because all references to the actor were removed.

try:
    print("detached_actor.ping:", ray.get(detached_actor.ping.remote()))
except ray.exceptions.RayActorError as e:
    print("Failed to submit detached actor call", e)
# detached_actor.ping: hello

强制杀死行为异常的 actor#

有时应用层代码可能导致 actor 挂起或资源泄漏。在这种情况下,Ray 允许您通过手动终止 actor 来从故障中恢复。您可以通过对 actor 的任何句柄调用 ray.kill 来实现这一点。请注意,不必是 actor 的原始句柄。

如果设置了 max_restarts,您还可以通过将 no_restart=False 传递给 ray.kill 来允许 Ray 自动重启 actor。

不可用 actor#

当 actor 无法接受方法调用时,对方法返回的对象引用调用 ray.get 可能会引发 ActorUnavailableError。此异常表明 actor 当前不可访问,但在等待和重试后可能恢复。典型情况包括:

  • actor 正在重启。例如,它可能正在等待资源,或在重启期间运行类构造函数。

  • actor 正在经历短暂的网络问题,例如连接中断。

  • actor 已死亡,但死亡尚未报告给系统。

actor 方法调用执行至多一次 (at-most-once)。当 ray.get() 调用引发 ActorUnavailableError 异常时,无法保证 actor 是否执行了该任务。如果方法具有副作用,这些副作用可能可观察,也可能不可观察。Ray 保证该方法不会被执行两次,除非 actor 或方法配置了重试,如下一节所述。

该 actor 在后续调用中可能恢复,也可能不恢复。如果 actor 确认已死亡,后续调用可能会引发 ActorDiedError;如果仍然不可达,则可能引发 ActorUnavailableError;如果 actor 恢复,则可能正常返回结果。

作为最佳实践,如果调用者遇到 ActorUnavailableError 错误,应“隔离”该 actor,停止向其发送流量。然后可以定期 ping 该 actor,直到它引发 ActorDiedError 或返回 OK。

如果任务的 max_task_retries > 0 并收到了 ActorUnavailableError,Ray 将重试该任务最多 max_task_retries 次。如果 actor 在其构造函数中重启,任务重试将失败,消耗一次重试计数。如果仍有剩余重试次数,Ray 将在 RAY_task_retry_delay_ms 后再次重试,直到所有重试次数耗尽或 actor 准备好接受任务。如果构造函数运行时间较长,请考虑增加 max_task_retriesRAY_task_retry_delay_ms

Actor 方法异常#

有时您希望在 actor 方法引发异常时重试。使用 max_task_retries 配合 retry_exceptions 进行重试。

请注意,默认情况下,对用户引发的异常进行重试是禁用的。要启用它,请确保方法是幂等 (idempotent) 的,即多次调用它应该等同于只调用一次。

您可以在 @ray.method(retry_exceptions=...) 装饰器中设置 retry_exceptions,或在方法调用中的 .options(retry_exceptions=...) 中设置。

重试行为取决于您为 retry_exceptions 设置的值: - retry_exceptions == False (默认):不对用户异常进行重试。 - retry_exceptions == True:Ray 对用户异常进行重试,最多 max_task_retries 次。 - retry_exceptions 是一个异常列表:Ray 仅当方法引发了来自这些特定类的异常时,才会对其进行重试,最多 max_task_retries 次。

max_task_retries 适用于异常和 actor 崩溃。Ray actor 可以设置此选项以应用于其所有方法。方法也可以为自己设置一个覆盖选项。Ray 按照以下顺序搜索 max_task_retries 的第一个非默认值:

  • 方法调用的值,例如 actor.method.options(max_task_retries=2)。如果您未设置此值,Ray 将忽略它。

  • 方法定义的值,例如 @ray.method(max_task_retries=2)。如果您未设置此值,Ray 将忽略它。

  • actor 创建调用的值,例如 Actor.options(max_task_retries=2)。如果您未设置此值,Ray 将忽略它。

  • Actor 类定义的值,例如 @ray.remote(max_task_retries=2) 装饰器。如果您未设置此值,Ray 将忽略它。

  • 默认值,`0`。

例如,如果一个方法设置了 max_task_retries=5retry_exceptions=True,而 actor 设置了 max_restarts=2,Ray 将执行该方法最多 6 次:一次初始调用,以及 5 次额外重试。这 6 次调用可能包括 2 次 actor 崩溃。在第 6 次调用之后,对结果 Ray ObjectRef 调用 ray.get 将引发最后一次调用中抛出的异常,或者如果 actor 在最后一次调用中崩溃,则引发 ray.exceptions.RayActorError