反模式:带外序列化 ray.ObjectRef#
摘要: 避免序列化 ray.ObjectRef,因为 Ray 无法知道何时对底层对象进行垃圾回收。
Ray 的 ray.ObjectRef 是分布式引用计数的。Ray 会固定(pin)底层对象,直到该引用不再被系统使用。当固定对象的全部引用消失后,Ray 会对该固定对象进行垃圾回收并从系统中清理。但是,如果用户代码序列化 ray.ObjectRef,Ray 就无法跟踪该引用。
为避免不正确的行为,如果 ray.cloudpickle 序列化 ray.ObjectRef,Ray 将在 worker 的生命周期内固定该对象。“固定”意味着在相应的拥有 worker 死亡之前,该对象不能从对象存储中被逐出。这很容易导致 Ray 对象泄漏,进而可能导致磁盘溢出。有关更多详细信息,请参阅本页面。
要检测您的代码中是否存在此模式,您可以设置环境变量 RAY_allow_out_of_band_object_ref_serialization=0。如果 Ray 检测到 ray.cloudpickle 序列化了 ray.ObjectRef,它将引发一个带有有用消息的异常。
代码示例#
反模式
import ray
import pickle
from ray._private.internal_api import memory_summary
import ray.exceptions
ray.init()
@ray.remote
def out_of_band_serialization_pickle():
obj_ref = ray.put(1)
import pickle
# object_ref is serialized from user code using a regular pickle.
# Ray can't keep track of the reference, so the underlying object
# can be GC'ed unexpectedly, which can cause unexpected hangs.
return pickle.dumps(obj_ref)
@ray.remote
def out_of_band_serialization_ray_cloudpickle():
obj_ref = ray.put(1)
from ray import cloudpickle
# ray.cloudpickle can serialize only when
# RAY_allow_out_of_band_object_ref_serialization=1 env var is set.
# However, the object_ref is pinned for the lifetime of the worker,
# which can cause Ray object leaks that can cause spilling.
return cloudpickle.dumps(obj_ref)
print("==== serialize object ref with pickle ====")
result = ray.get(out_of_band_serialization_pickle.remote())
try:
ray.get(pickle.loads(result), timeout=5)
except ray.exceptions.GetTimeoutError:
print("Underlying object is unexpectedly GC'ed!\n\n")
print("==== serialize object ref with ray.cloudpickle ====")
# By default, it's allowed to serialize ray.ObjectRef using
# ray.cloudpickle.
ray.get(out_of_band_serialization_ray_cloudpickle.options().remote())
# you can see objects are still pinned although it's GC'ed and not used anymore.
print(memory_summary())
print(
"==== serialize object ref with ray.cloudpickle with env var "
"RAY_allow_out_of_band_object_ref_serialization=0 for debugging ===="
)
try:
ray.get(
out_of_band_serialization_ray_cloudpickle.options(
runtime_env={
"env_vars": {
"RAY_allow_out_of_band_object_ref_serialization": "0",
}
}
).remote()
)
except Exception as e:
print(f"Exception raised from out_of_band_serialization_ray_cloudpickle {e}\n\n")