反模式:带外序列化 ray.ObjectRef#
总结:避免序列化 ray.ObjectRef
,因为 Ray 无法知道何时对底层对象进行垃圾回收。
Ray 的 ray.ObjectRef
是分布式引用计数的。Ray 会固定底层对象,直到系统不再使用该引用。当固定对象的所有引用都消失后,Ray 会对固定对象进行垃圾回收并从系统中清理掉。但是,如果用户代码序列化 ray.objectRef
,Ray 就无法跟踪该引用。
为避免不正确的行为,如果 ray.cloudpickle
序列化了 ray.ObjectRef
,Ray 会在 worker 的生命周期内固定该对象。“固定”意味着该对象不会从对象存储中逐出,直到相应的 owner worker 死亡。这容易导致 Ray 对象泄漏,进而可能导致磁盘溢出。有关更多详细信息,请参阅 此页面。
要检测代码中是否存在这种模式,可以设置环境变量 RAY_allow_out_of_band_object_ref_serialization=0
。如果 Ray 检测到 ray.cloudpickle
序列化了 ray.ObjectRef
,它会引发异常并显示有用的消息。
代码示例#
反模式
import ray
import pickle
from ray._private.internal_api import memory_summary
import ray.exceptions
ray.init()
@ray.remote
def out_of_band_serialization_pickle():
obj_ref = ray.put(1)
import pickle
# object_ref is serialized from user code using a regular pickle.
# Ray can't keep track of the reference, so the underlying object
# can be GC'ed unexpectedly, which can cause unexpected hangs.
return pickle.dumps(obj_ref)
@ray.remote
def out_of_band_serialization_ray_cloudpickle():
obj_ref = ray.put(1)
from ray import cloudpickle
# ray.cloudpickle can serialize only when
# RAY_allow_out_of_band_object_ref_serialization=1 env var is set.
# However, the object_ref is pinned for the lifetime of the worker,
# which can cause Ray object leaks that can cause spilling.
return cloudpickle.dumps(obj_ref)
print("==== serialize object ref with pickle ====")
result = ray.get(out_of_band_serialization_pickle.remote())
try:
ray.get(pickle.loads(result), timeout=5)
except ray.exceptions.GetTimeoutError:
print("Underlying object is unexpectedly GC'ed!\n\n")
print("==== serialize object ref with ray.cloudpickle ====")
# By default, it's allowed to serialize ray.ObjectRef using
# ray.cloudpickle.
ray.get(out_of_band_serialization_ray_cloudpickle.options().remote())
# you can see objects are still pinned although it's GC'ed and not used anymore.
print(memory_summary())
print(
"==== serialize object ref with ray.cloudpickle with env var "
"RAY_allow_out_of_band_object_ref_serialization=0 for debugging ===="
)
try:
ray.get(
out_of_band_serialization_ray_cloudpickle.options(
runtime_env={
"env_vars": {
"RAY_allow_out_of_band_object_ref_serialization": "0",
}
}
).remote()
)
except Exception as e:
print(f"Exception raised from out_of_band_serialization_ray_cloudpickle {e}\n\n")