任务容错#

任务可能由于应用程序级错误(例如,Python 级别异常)或系统级故障(例如,机器故障)而失败。在此,我们描述了应用程序开发人员可以用来从这些错误中恢复的机制。

捕获应用程序级故障#

Ray 将应用程序级故障呈现为 Python 级别异常。当远程工作器或 actor 上的任务因 Python 级别异常而失败时,Ray 会将原始异常包装在 RayTaskError 中,并将其存储为任务的返回值。此包装的异常将抛出给尝试获取结果的任何工作器,无论是通过调用 ray.get 还是当工作器正在执行另一个依赖于该对象的任务时。如果用户的异常类型可以被子类化,则引发的异常是 RayTaskError 和用户异常类型的实例,因此用户可以尝试捕获其中任何一个。否则,包装的异常仅为 RayTaskError,而实际的用户异常类型可以通过 RayTaskErrorcause 字段访问。


import ray

@ray.remote
def f():
    raise Exception("the real error")

@ray.remote
def g(x):
    return


try:
    ray.get(f.remote())
except ray.exceptions.RayTaskError as e:
    print(e)
    # ray::f() (pid=71867, ip=XXX.XX.XXX.XX)
    #   File "errors.py", line 5, in f
    #     raise Exception("the real error")
    # Exception: the real error

try:
    ray.get(g.remote(f.remote()))
except ray.exceptions.RayTaskError as e:
    print(e)
    # ray::g() (pid=73085, ip=128.32.132.47)
    #   At least one of the input arguments for this task could not be computed:
    # ray.exceptions.RayTaskError: ray::f() (pid=73085, ip=XXX.XX.XXX.XX)
    #   File "errors.py", line 5, in f
    #     raise Exception("the real error")
    # Exception: the real error

当异常类型可以被子类化时,捕获用户异常类型的示例代码


class MyException(Exception):
    ...

@ray.remote
def raises_my_exc():
    raise MyException("a user exception")
try:
    ray.get(raises_my_exc.remote())
except MyException as e:
    print(e)
    # ray::raises_my_exc() (pid=15329, ip=127.0.0.1)
    #   File "<$PWD>/task_exceptions.py", line 45, in raises_my_exc
    #     raise MyException("a user exception")
    # MyException: a user exception

当异常类型无法被子类化时,访问用户异常类型的示例代码

class MyFinalException(Exception):
    def __init_subclass__(cls, /, *args, **kwargs):
        raise TypeError("Cannot subclass this little exception class.")

@ray.remote
def raises_my_final_exc():
    raise MyFinalException("a *final* user exception")
try:
    ray.get(raises_my_final_exc.remote())
except ray.exceptions.RayTaskError as e:
    assert isinstance(e.cause, MyFinalException)
    print(e)
    # 2024-04-08 21:11:47,417 WARNING exceptions.py:177 -- User exception type <class '__main__.MyFinalException'> in RayTaskError can not be subclassed! This exception will be raised as RayTaskError only. You can use `ray_task_error.cause` to access the user exception. Failure in subclassing: Cannot subclass this little exception class.
    # ray::raises_my_final_exc() (pid=88226, ip=127.0.0.1)
    # File "<$PWD>/task_exceptions.py", line 66, in raises_my_final_exc
    #     raise MyFinalException("a *final* user exception")
    # MyFinalException: a *final* user exception
    print(type(e.cause))
    # <class '__main__.MyFinalException'>
    print(e.cause)
    # a *final* user exception

如果 Ray 无法序列化用户的异常,它会将异常转换为 RayError


import threading

class UnserializableException(Exception):
    def __init__(self):
        self.lock = threading.Lock()

@ray.remote
def raise_unserializable_error():
    raise UnserializableException

try:
    ray.get(raise_unserializable_error.remote())
except ray.exceptions.RayTaskError as e:
    print(e)
    # ray::raise_unserializable_error() (pid=328577, ip=172.31.5.154)
    #   File "/home/ubuntu/ray/tmp~/main.py", line 25, in raise_unserializable_error
    #     raise UnserializableException
    # UnserializableException
    print(type(e.cause))
    # <class 'ray.exceptions.RayError'>
    print(e.cause)
    # The original cause of the RayTaskError (<class '__main__.UnserializableException'>) isn't serializable: cannot pickle '_thread.lock' object. Overwriting the cause to a RayError.

使用 ray list tasksState API CLI 查询任务退出详情

# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks
======== List: 2023-05-26 10:32:00.962610 ========
Stats:
------------------------------
Total: 3

Table:
------------------------------
    TASK_ID                                             ATTEMPT_NUMBER  NAME    STATE      JOB_ID  ACTOR_ID    TYPE         FUNC_OR_CLASS_NAME    PARENT_TASK_ID                                    NODE_ID                                                   WORKER_ID                                                 ERROR_TYPE
 0  16310a0f0a45af5cffffffffffffffffffffffff01000000                 0  f       FAILED   01000000              NORMAL_TASK  f                     ffffffffffffffffffffffffffffffffffffffff01000000  767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad  b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9  TASK_EXECUTION_EXCEPTION
 1  c2668a65bda616c1ffffffffffffffffffffffff01000000                 0  g       FAILED   01000000              NORMAL_TASK  g                     ffffffffffffffffffffffffffffffffffffffff01000000  767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad  b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9  TASK_EXECUTION_EXCEPTION
 2  c8ef45ccd0112571ffffffffffffffffffffffff01000000                 0  f       FAILED   01000000              NORMAL_TASK  f                     ffffffffffffffffffffffffffffffffffffffff01000000  767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad  b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9  TASK_EXECUTION_EXCEPTION

重试失败的任务#

当工作器执行任务时,如果工作器意外死亡(由于进程崩溃或机器故障),Ray 将重新运行该任务,直到任务成功或达到最大重试次数。默认重试次数为 3,可以通过在 @ray.remote 装饰器中指定 max_retries 来覆盖。指定 -1 允许无限次重试,0 则禁用重试。要覆盖提交的所有任务的默认重试次数,请设置操作系统环境变量 RAY_TASK_MAX_RETRIES。例如,通过将其传递给驱动脚本或使用 运行时环境

您可以通过运行以下代码来试验此行为。

import numpy as np
import os
import ray
import time

ray.init(ignore_reinit_error=True)

@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
    time.sleep(0.2)
    if np.random.random() < failure_probability:
        os._exit(0)
    return 0

for _ in range(3):
    try:
        # If this task crashes, Ray will retry it up to one additional
        # time. If either of the attempts succeeds, the call to ray.get
        # below will return normally. Otherwise, it will raise an
        # exception.
        ray.get(potentially_fail.remote(0.5))
        print('SUCCESS')
    except ray.exceptions.WorkerCrashedError:
        print('FAILURE')

当任务在 Ray 对象存储中返回结果时,原始任务完成后,结果对象可能会丢失。在这些情况下,Ray 还会尝试通过重新执行创建对象的任务来自动恢复对象。这可以通过此处所述的相同 max_retries 选项进行配置。有关更多信息,请参阅 对象容错

默认情况下,Ray 不会因应用程序代码抛出的异常而重试任务。但是,您可以通过 retry_exceptions 参数来控制是否重试应用程序级错误,甚至控制哪些应用程序级错误会被重试。此参数默认为 False。要启用应用程序级错误的重试,请将 retry_exceptions=True 设置为重试任何异常,或传递一个可重试异常的列表。下面是一个示例。

import numpy as np
import os
import ray
import time

ray.init(ignore_reinit_error=True)

class RandomError(Exception):
    pass

@ray.remote(max_retries=1, retry_exceptions=True)
def potentially_fail(failure_probability):
    if failure_probability < 0 or failure_probability > 1:
        raise ValueError(
            "failure_probability must be between 0 and 1, but got: "
            f"{failure_probability}"
        )
    time.sleep(0.2)
    if np.random.random() < failure_probability:
        raise RandomError("Failed!")
    return 0

for _ in range(3):
    try:
        # If this task crashes, Ray will retry it up to one additional
        # time. If either of the attempts succeeds, the call to ray.get
        # below will return normally. Otherwise, it will raise an
        # exception.
        ray.get(potentially_fail.remote(0.5))
        print('SUCCESS')
    except RandomError:
        print('FAILURE')

# Provide the exceptions that we want to retry as an allowlist.
retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError])
try:
    # This will fail since we're passing in -1 for the failure_probability,
    # which will raise a ValueError in the task and does not match the RandomError
    # exception that we provided.
    ray.get(retry_on_exception.remote(-1))
except ValueError:
    print("FAILED AS EXPECTED")
else:
    raise RuntimeError("An exception should be raised so this shouldn't be reached.")

# These will retry on the RandomError exception.
for _ in range(3):
    try:
        # If this task crashes, Ray will retry it up to one additional
        # time. If either of the attempts succeeds, the call to ray.get
        # below will return normally. Otherwise, it will raise an
        # exception.
        ray.get(retry_on_exception.remote(0.5))
        print('SUCCESS')
    except RandomError:
        print('FAILURE AFTER RETRIES')

使用 ray list tasks -f task_id=<task_id>State API CLI 查看任务尝试失败和重试情况

# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks -f task_id=16310a0f0a45af5cffffffffffffffffffffffff01000000
======== List: 2023-05-26 10:38:08.809127 ========
Stats:
------------------------------
Total: 2

Table:
------------------------------
    TASK_ID                                             ATTEMPT_NUMBER  NAME              STATE       JOB_ID  ACTOR_ID    TYPE         FUNC_OR_CLASS_NAME    PARENT_TASK_ID                                    NODE_ID                                                   WORKER_ID                                                 ERROR_TYPE
 0  16310a0f0a45af5cffffffffffffffffffffffff01000000                 0  potentially_fail  FAILED    01000000              NORMAL_TASK  potentially_fail      ffffffffffffffffffffffffffffffffffffffff01000000  94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d  b36d22dbf47235872ad460526deaf35c178c7df06cee5aa9299a9255  WORKER_DIED
 1  16310a0f0a45af5cffffffffffffffffffffffff01000000                 1  potentially_fail  FINISHED  01000000              NORMAL_TASK  potentially_fail      ffffffffffffffffffffffffffffffffffffffff01000000  94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d  22df7f2a9c68f3db27498f2f435cc18582de991fbcaf49ce0094ddb0

取消表现不佳的任务#

如果任务挂起,您可能希望取消任务以继续推进。您可以通过调用任务返回的 ObjectRef 上的 ray.cancel 来实现。默认情况下,这会向任务的工作器发送一个 KeyboardInterrupt(如果任务正在执行中)。将 force=True 传递给 ray.cancel 将强制退出工作器。有关 ray.cancel 的更多详细信息,请参阅 API 参考

请注意,目前 Ray 不会自动重试已取消的任务。

有时,应用程序级代码在重复执行任务后可能会导致工作器内存泄漏(例如,由于第三方库中的错误)。为了在这些情况下继续进行,您可以在任务的 @ray.remote 装饰器中设置 max_calls 选项。一旦工作器执行了给定远程函数的这个数量的调用,它将自动退出。默认情况下,max_calls 设置为无穷大。