反模式:带外序列化 ray.ObjectRef#

总结:避免序列化 ray.ObjectRef,因为 Ray 无法知道何时对底层对象进行垃圾回收。

Ray 的 ray.ObjectRef 是分布式引用计数的。Ray 会固定底层对象,直到系统不再使用该引用。当固定对象的所有引用都消失后,Ray 会对固定对象进行垃圾回收并从系统中清理掉。但是,如果用户代码序列化 ray.objectRef,Ray 就无法跟踪该引用。

为避免不正确的行为,如果 ray.cloudpickle 序列化了 ray.ObjectRef,Ray 会在 worker 的生命周期内固定该对象。“固定”意味着该对象不会从对象存储中逐出,直到相应的 owner worker 死亡。这容易导致 Ray 对象泄漏,进而可能导致磁盘溢出。有关更多详细信息,请参阅 此页面

要检测代码中是否存在这种模式,可以设置环境变量 RAY_allow_out_of_band_object_ref_serialization=0。如果 Ray 检测到 ray.cloudpickle 序列化了 ray.ObjectRef,它会引发异常并显示有用的消息。

代码示例#

反模式

import ray
import pickle
from ray._private.internal_api import memory_summary
import ray.exceptions

ray.init()


@ray.remote
def out_of_band_serialization_pickle():
    obj_ref = ray.put(1)
    import pickle

    # object_ref is serialized from user code using a regular pickle.
    # Ray can't keep track of the reference, so the underlying object
    # can be GC'ed unexpectedly, which can cause unexpected hangs.
    return pickle.dumps(obj_ref)


@ray.remote
def out_of_band_serialization_ray_cloudpickle():
    obj_ref = ray.put(1)
    from ray import cloudpickle

    # ray.cloudpickle can serialize only when
    # RAY_allow_out_of_band_object_ref_serialization=1 env var is set.
    # However, the object_ref is pinned for the lifetime of the worker,
    # which can cause Ray object leaks that can cause spilling.
    return cloudpickle.dumps(obj_ref)


print("==== serialize object ref with pickle ====")
result = ray.get(out_of_band_serialization_pickle.remote())
try:
    ray.get(pickle.loads(result), timeout=5)
except ray.exceptions.GetTimeoutError:
    print("Underlying object is unexpectedly GC'ed!\n\n")

print("==== serialize object ref with ray.cloudpickle ====")
# By default, it's allowed to serialize ray.ObjectRef using
# ray.cloudpickle.
ray.get(out_of_band_serialization_ray_cloudpickle.options().remote())
# you can see objects are still pinned although it's GC'ed and not used anymore.
print(memory_summary())

print(
    "==== serialize object ref with ray.cloudpickle with env var "
    "RAY_allow_out_of_band_object_ref_serialization=0 for debugging ===="
)
try:
    ray.get(
        out_of_band_serialization_ray_cloudpickle.options(
            runtime_env={
                "env_vars": {
                    "RAY_allow_out_of_band_object_ref_serialization": "0",
                }
            }
        ).remote()
    )
except Exception as e:
    print(f"Exception raised from out_of_band_serialization_ray_cloudpickle {e}\n\n")