任务容错#
任务可能由于应用级错误(例如 Python 级异常)或系统级故障(例如机器故障)而失败。在这里,我们描述了应用开发者可以用来从这些错误中恢复的机制。
捕获应用级故障#
Ray 将应用级故障呈现为 Python 级异常。当远端 worker 或 actor 上的任务因 Python 级异常而失败时,Ray 会将原始异常包装在 RayTaskError
中,并将其存储为任务的返回值。任何尝试获取结果的 worker,无论是调用 ray.get
还是执行依赖于该对象的另一个任务,都会抛出这个包装后的异常。如果用户的异常类型可以被子类化,则抛出的异常将是 RayTaskError
和用户异常类型的实例,以便用户可以捕获其中任何一个。否则,包装后的异常仅是 RayTaskError
,实际的用户异常类型可以通过 RayTaskError
的 cause
字段访问。
import ray
@ray.remote
def f():
raise Exception("the real error")
@ray.remote
def g(x):
return
try:
ray.get(f.remote())
except ray.exceptions.RayTaskError as e:
print(e)
# ray::f() (pid=71867, ip=XXX.XX.XXX.XX)
# File "errors.py", line 5, in f
# raise Exception("the real error")
# Exception: the real error
try:
ray.get(g.remote(f.remote()))
except ray.exceptions.RayTaskError as e:
print(e)
# ray::g() (pid=73085, ip=128.32.132.47)
# At least one of the input arguments for this task could not be computed:
# ray.exceptions.RayTaskError: ray::f() (pid=73085, ip=XXX.XX.XXX.XX)
# File "errors.py", line 5, in f
# raise Exception("the real error")
# Exception: the real error
当异常类型可以被子类化时,捕获用户异常类型的示例代码
class MyException(Exception):
...
@ray.remote
def raises_my_exc():
raise MyException("a user exception")
try:
ray.get(raises_my_exc.remote())
except MyException as e:
print(e)
# ray::raises_my_exc() (pid=15329, ip=127.0.0.1)
# File "<$PWD>/task_exceptions.py", line 45, in raises_my_exc
# raise MyException("a user exception")
# MyException: a user exception
当异常类型不可以被子类化时,访问用户异常类型的示例代码
class MyFinalException(Exception):
def __init_subclass__(cls, /, *args, **kwargs):
raise TypeError("Cannot subclass this little exception class.")
@ray.remote
def raises_my_final_exc():
raise MyFinalException("a *final* user exception")
try:
ray.get(raises_my_final_exc.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, MyFinalException)
print(e)
# 2024-04-08 21:11:47,417 WARNING exceptions.py:177 -- User exception type <class '__main__.MyFinalException'> in RayTaskError can not be subclassed! This exception will be raised as RayTaskError only. You can use `ray_task_error.cause` to access the user exception. Failure in subclassing: Cannot subclass this little exception class.
# ray::raises_my_final_exc() (pid=88226, ip=127.0.0.1)
# File "<$PWD>/task_exceptions.py", line 66, in raises_my_final_exc
# raise MyFinalException("a *final* user exception")
# MyFinalException: a *final* user exception
print(type(e.cause))
# <class '__main__.MyFinalException'>
print(e.cause)
# a *final* user exception
如果 Ray 无法序列化用户的异常,它会将异常转换为 RayError
。
import threading
class UnserializableException(Exception):
def __init__(self):
self.lock = threading.Lock()
@ray.remote
def raise_unserializable_error():
raise UnserializableException
try:
ray.get(raise_unserializable_error.remote())
except ray.exceptions.RayTaskError as e:
print(e)
# ray::raise_unserializable_error() (pid=328577, ip=172.31.5.154)
# File "/home/ubuntu/ray/tmp~/main.py", line 25, in raise_unserializable_error
# raise UnserializableException
# UnserializableException
print(type(e.cause))
# <class 'ray.exceptions.RayError'>
print(e.cause)
# The original cause of the RayTaskError (<class '__main__.UnserializableException'>) isn't serializable: cannot pickle '_thread.lock' object. Overwriting the cause to a RayError.
使用 State API CLI 中的 ray list tasks
查询任务退出详情
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks
======== List: 2023-05-26 10:32:00.962610 ========
Stats:
------------------------------
Total: 3
Table:
------------------------------
TASK_ID ATTEMPT_NUMBER NAME STATE JOB_ID ACTOR_ID TYPE FUNC_OR_CLASS_NAME PARENT_TASK_ID NODE_ID WORKER_ID ERROR_TYPE
0 16310a0f0a45af5cffffffffffffffffffffffff01000000 0 f FAILED 01000000 NORMAL_TASK f ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
1 c2668a65bda616c1ffffffffffffffffffffffff01000000 0 g FAILED 01000000 NORMAL_TASK g ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
2 c8ef45ccd0112571ffffffffffffffffffffffff01000000 0 f FAILED 01000000 NORMAL_TASK f ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
重试失败的任务#
当 worker 正在执行任务时,如果 worker 意外死亡(无论是进程崩溃还是机器故障),Ray 将重新运行该任务,直到任务成功或达到最大重试次数。默认重试次数为 3,可以通过在 @ray.remote
装饰器中指定 max_retries
来覆盖。指定 -1 表示无限重试,0 表示禁用重试。要覆盖所有提交任务的默认重试次数,请设置 OS 环境变量 RAY_TASK_MAX_RETRIES
,例如通过将其传递给你的驱动脚本或使用运行时环境。
你可以通过运行以下代码来实验这种行为。
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
当任务在 Ray 对象存储中返回结果时,可能会在原始任务完成后丢失结果对象。在这种情况下,Ray 也会尝试通过重新执行创建该对象的任务来自动恢复对象。这可以通过此处描述的相同 max_retries
选项进行配置。更多信息请参阅对象容错。
默认情况下,Ray 在应用代码抛出异常时不会重试任务。但是,你可以通过 retry_exceptions
参数控制是否重试应用级错误,甚至控制哪些应用级错误进行重试。该参数默认值为 False
。要启用应用级错误重试,请设置 retry_exceptions=True
以重试任何异常,或传递一个可重试异常列表。示例如下。
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
class RandomError(Exception):
pass
@ray.remote(max_retries=1, retry_exceptions=True)
def potentially_fail(failure_probability):
if failure_probability < 0 or failure_probability > 1:
raise ValueError(
"failure_probability must be between 0 and 1, but got: "
f"{failure_probability}"
)
time.sleep(0.2)
if np.random.random() < failure_probability:
raise RandomError("Failed!")
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE')
# Provide the exceptions that we want to retry as an allowlist.
retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError])
try:
# This will fail since we're passing in -1 for the failure_probability,
# which will raise a ValueError in the task and does not match the RandomError
# exception that we provided.
ray.get(retry_on_exception.remote(-1))
except ValueError:
print("FAILED AS EXPECTED")
else:
raise RuntimeError("An exception should be raised so this shouldn't be reached.")
# These will retry on the RandomError exception.
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(retry_on_exception.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE AFTER RETRIES')
使用 State API CLI 中的 ray list tasks -f task_id=<task_id>
查看任务尝试失败和重试情况
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks -f task_id=16310a0f0a45af5cffffffffffffffffffffffff01000000
======== List: 2023-05-26 10:38:08.809127 ========
Stats:
------------------------------
Total: 2
Table:
------------------------------
TASK_ID ATTEMPT_NUMBER NAME STATE JOB_ID ACTOR_ID TYPE FUNC_OR_CLASS_NAME PARENT_TASK_ID NODE_ID WORKER_ID ERROR_TYPE
0 16310a0f0a45af5cffffffffffffffffffffffff01000000 0 potentially_fail FAILED 01000000 NORMAL_TASK potentially_fail ffffffffffffffffffffffffffffffffffffffff01000000 94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d b36d22dbf47235872ad460526deaf35c178c7df06cee5aa9299a9255 WORKER_DIED
1 16310a0f0a45af5cffffffffffffffffffffffff01000000 1 potentially_fail FINISHED 01000000 NORMAL_TASK potentially_fail ffffffffffffffffffffffffffffffffffffffff01000000 94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d 22df7f2a9c68f3db27498f2f435cc18582de991fbcaf49ce0094ddb0
取消异常行为的任务#
如果任务挂起了,你可能希望取消该任务以继续进行。你可以通过对任务返回的 ObjectRef
调用 ray.cancel
来实现。默认情况下,如果任务正在执行中,这将向任务的 worker 发送一个 KeyboardInterrupt。将 force=True
传递给 ray.cancel
将强制退出 worker。有关 ray.cancel
的更多详细信息,请参阅API 参考
。
注意,目前 Ray 不会自动重试已取消的任务。
有时,应用级代码在重复执行任务后可能导致 worker 上发生内存泄漏,例如由于第三方库的 bug。为了在这些情况下继续进行,你可以在任务的 @ray.remote
装饰器中设置 max_calls
选项。一旦 worker 执行给定远端函数达到此调用次数,它将自动退出。默认情况下,max_calls
设置为无限。