序列化#
由于 Ray 进程不共享内存空间,因此需要在工作节点和节点之间传输的数据进行序列化和反序列化。Ray 使用 Plasma 对象存储 来高效地跨不同进程和不同节点传输对象。对象存储中的 Numpy 数组在同一节点上的工作节点之间共享(零拷贝反序列化)。
概述#
Ray 已决定使用定制的 Pickle 协议版本 5 的反向移植版本来替换原始的 PyArrow 序列化器。这消除了许多先前的限制(例如,无法序列化递归对象)。
Ray 目前兼容 Pickle 协议版本 5,同时借助 cloudpickle,Ray 支持更广泛对象的序列化(例如,lambda 和嵌套函数、动态类)。
Plasma 对象存储#
Plasma 是一个内存对象存储。它最初是作为 Apache Arrow 的一部分开发的。在 Ray 1.0.0 版本发布之前,Ray 将 Arrow 的 Plasma 代码 fork 到 Ray 的代码库中,以便根据 Ray 的架构和性能需求进行解耦和继续开发。
Plasma 用于在不同进程和不同节点之间高效地传输对象。Plasma 对象存储中的所有对象都是不可变的,并存储在共享内存中。这样,同一节点上的许多工作节点就可以高效地访问它们。
每个节点都有自己的对象存储。当数据放入对象存储时,它不会自动广播到其他节点。数据在写入者本地保留,直到被另一个节点上的任务或 Actor 请求。
序列化 ObjectRefs#
应仅作为最后手段才使用 `ray.cloudpickle` 显式序列化 `ObjectRefs`。将 `ObjectRefs` 作为 Ray 任务的参数和返回值传递是推荐的方法。
`ObjectRefs` 可以使用 `ray.cloudpickle` 进行序列化。然后可以使用 `ray.get()` 反序列化并访问 `ObjectRef`。请注意,必须使用 `ray.cloudpickle`;其他 pickle 工具不能保证正常工作。此外,反序列化 `ObjectRef` 的进程必须是序列化它的同一 Ray 集群的一部分。
序列化时,`ObjectRef` 的值将保留在 Ray 的共享内存对象存储中。必须通过调用 `ray._private.internal_api.free(obj_ref)` 来显式释放对象。
警告
`ray._private.internal_api.free(obj_ref)` 是一个私有 API,在未来的 Ray 版本中可能会发生更改。
此代码示例演示了如何序列化 `ObjectRef`,将其存储在外部存储中,反序列化并使用它,最后释放其对象。
import ray
from ray import cloudpickle
FILE = "external_store.pickle"
ray.init()
my_dict = {"hello": "world"}
obj_ref = ray.put(my_dict)
with open(FILE, "wb+") as f:
cloudpickle.dump(obj_ref, f)
# ObjectRef remains pinned in memory because
# it was serialized with ray.cloudpickle.
del obj_ref
with open(FILE, "rb") as f:
new_obj_ref = cloudpickle.load(f)
# The deserialized ObjectRef works as expected.
assert ray.get(new_obj_ref) == my_dict
# Explicitly free the object.
ray._private.internal_api.free(new_obj_ref)
Numpy 数组#
Ray 通过使用具有带外数据的 Pickle 协议 5 来优化 numpy 数组。numpy 数组被存储为只读对象,同一节点上的所有 Ray 工作节点都可以从对象存储中读取 numpy 数组而不进行复制(零拷贝读取)。工作节点进程中的每个 numpy 数组对象都持有指向共享内存中相关数组的指针。对只读对象的任何写入都需要用户首先将其复制到本地进程内存中。
提示
通常可以通过仅使用原生类型(例如,numpy 数组或包含 numpy 数组和其他基本类型的列表/字典),或者使用 Actor 来保存无法序列化的对象来避免序列化问题。
修复“赋值目标是只读的”#
由于 Ray 将 numpy 数组放入对象存储中,当它们作为参数在远程函数中反序列化时,它们将变为只读。例如,以下代码片段将崩溃
import ray
import numpy as np
@ray.remote
def f(arr):
# arr = arr.copy() # Adding a copy will fix the error.
arr[0] = 1
try:
ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
# File "test.py", line 6, in f
# arr[0] = 1
# ValueError: assignment destination is read-only
为避免此问题,如果您需要修改数组,可以在目标处手动复制数组(`arr = arr.copy()`)。请注意,这实际上相当于禁用了 Ray 提供的零拷贝反序列化功能。
序列化注意事项#
Ray 目前正在使用 Pickle 协议版本 5。大多数 Python 发行版使用的默认 pickle 协议是协议 3。对于较大的对象,协议 4 和 5 比协议 3 更高效。
对于非原生对象,即使它在一个对象中被引用多次,Ray 也会始终保留一个副本。
import ray import numpy as np obj = [np.zeros(42)] * 99 l = ray.get(ray.put(obj)) assert l[0] is l[1] # no problem!
尽可能使用 numpy 数组或 numpy 数组的 Python 集合以获得最佳性能。
锁对象大多是不可序列化的,因为复制锁没有意义,并且可能导致严重的并发问题。如果您的对象包含锁,您可能需要找到一个解决方法。
只读张量的零拷贝序列化#
Ray 为只读 PyTorch 张量提供可选的零拷贝序列化。Ray 通过将这些张量转换为 NumPy 数组并利用 pickle5 的零拷贝缓冲区共享来序列化它们。这避免了复制底层张量数据,从而在跨任务或 Actor 传递大型张量时提高性能。但是,PyTorch 不原生支持只读张量,因此此功能必须谨慎使用。
启用该功能后,Ray 不会复制并允许写入共享内存。一个进程在 `ray.get()` 后更改张量可能会反映在另一个进程中,如果两个进程都位于同一节点上。此功能在以下条件下效果最好
张量具有 `requires_grad = False`(即,从自动微分图中分离)。
张量在内存中是连续的(`tensor.is_contiguous()`)。
如果张量位于 CPU 内存中,性能提升会更大。
您没有使用 Ray Direct Transport。
此功能默认禁用。您可以通过设置环境变量 `RAY_ENABLE_ZERO_COPY_TORCH_TENSORS` 来启用它。在运行脚本之前,在外部设置此变量以在驱动进程中启用零拷贝序列化。
export RAY_ENABLE_ZERO_COPY_TORCH_TENSORS=1
以下示例使用 `ray.get()` 计算一个 1GiB 张量的总和,利用零拷贝序列化
import ray
import torch
import time
ray.init(runtime_env={"env_vars": {"RAY_ENABLE_ZERO_COPY_TORCH_TENSORS": "1"}})
@ray.remote
def process(tensor):
return tensor.sum()
x = torch.ones(1024, 1024, 256)
start_time = time.perf_counter()
result = ray.get(process.remote(x))
elapsed_time = time.perf_counter() - start_time
print(f"Elapsed time: {elapsed_time}s")
assert result == x.sum()
在此示例中,启用零拷贝序列化可将端到端延迟降低 66.3%。
# Without Zero-Copy Serialization
Elapsed time: 23.53883756196592s
# With Zero-Copy Serialization
Elapsed time: 7.933729998010676s
自定义序列化#
有时您可能希望自定义序列化过程,因为 Ray 使用的默认序列化器(pickle5 + cloudpickle)不适合您(无法序列化某些对象、对某些对象太慢等)。
至少有 3 种方法可以定义自定义序列化过程
如果您想自定义一类对象的序列化,并且您有权访问代码,您可以在相应类中定义 `__reduce__` 函数。这是大多数 Python 库通常会做的事情。示例代码
import ray import sqlite3 class DBConnection: def __init__(self, path): self.path = path self.conn = sqlite3.connect(path) # without '__reduce__', the instance is unserializable. def __reduce__(self): deserializer = DBConnection serialized_data = (self.path,) return deserializer, serialized_data original = DBConnection("/tmp/db") print(original.conn) copied = ray.get(ray.put(original)) print(copied.conn)
<sqlite3.Connection object at ...> <sqlite3.Connection object at ...>
如果您想自定义一类对象的序列化,但您无法访问或修改相应的类,您可以使用您使用的序列化器注册该类。
import ray import threading class A: def __init__(self, x): self.x = x self.lock = threading.Lock() # could not be serialized! try: ray.get(ray.put(A(1))) # fail! except TypeError: pass def custom_serializer(a): return a.x def custom_deserializer(b): return A(b) # Register serializer and deserializer for class A: ray.util.register_serializer( A, serializer=custom_serializer, deserializer=custom_deserializer) ray.get(ray.put(A(1))) # success! # You can deregister the serializer at any time. ray.util.deregister_serializer(A) try: ray.get(ray.put(A(1))) # fail! except TypeError: pass # Nothing happens when deregister an unavailable serializer. ray.util.deregister_serializer(A)
注意:序列化器由每个 Ray 工作节点本地管理。因此,对于每个 Ray 工作节点,如果您想使用该序列化器,则需要注册该序列化器。注销序列化器也仅在本地适用。
如果您为类注册了一个新序列化器,新序列化器将立即在工作节点中替换旧序列化器。此 API 也是幂等的,重新注册相同的序列化器不会产生副作用。
如果您想自定义特定对象的序列化,我们也为您提供了一个示例。
import threading class A: def __init__(self, x): self.x = x self.lock = threading.Lock() # could not serialize! try: ray.get(ray.put(A(1))) # fail! except TypeError: pass class SerializationHelperForA: """A helper class for serialization.""" def __init__(self, a): self.a = a def __reduce__(self): return A, (self.a.x,) ray.get(ray.put(SerializationHelperForA(A(1)))) # success! # the serializer only works for a specific object, not all A # instances, so we still expect failure here. try: ray.get(ray.put(A(1))) # still fail! except TypeError: pass
异常的自定义序列化器#
当 Ray 任务引发使用默认 pickle 机制无法序列化的异常时,您可以注册自定义序列化器来处理它们(注意:序列化器必须在驱动程序和所有工作节点中注册)。
import ray
import threading
class CustomError(Exception):
def __init__(self, message, data):
self.message = message
self.data = data
self.lock = threading.Lock() # Cannot be serialized
def custom_serializer(exc):
return {"message": exc.message, "data": str(exc.data)}
def custom_deserializer(state):
return CustomError(state["message"], state["data"])
# Register in the driver
ray.util.register_serializer(
CustomError,
serializer=custom_serializer,
deserializer=custom_deserializer
)
@ray.remote
def task_that_registers_serializer_and_raises():
# Register the custom serializer in the worker
ray.util.register_serializer(
CustomError,
serializer=custom_serializer,
deserializer=custom_deserializer
)
# Now raise the custom exception
raise CustomError("Something went wrong", {"complex": "data"})
# The custom exception will be properly serialized across worker boundaries
try:
ray.get(task_that_registers_serializer_and_raises.remote())
except ray.exceptions.RayTaskError as e:
print(f"Caught exception: {e.cause}") # This will be our CustomError
当远程任务中引发自定义异常时,Ray 将
使用您的自定义序列化器序列化异常。
将其包装在 `RayTaskError` 中。
反序列化后的异常将作为 `ray_task_error.cause` 提供。
每当序列化失败时,Ray 都会抛出一个 `UnserializableException`,其中包含原始堆栈跟踪的字符串表示。
故障排除#
使用 `ray.util.inspect_serializability` 来识别棘手的 pickling 问题。此函数可用于跟踪任何 Python 对象中潜在的不可序列化对象——无论是函数、类还是对象实例。
下面,我们对一个包含不可序列化对象(线程锁)的函数演示了这种行为。
from ray.util import inspect_serializability
import threading
lock = threading.Lock()
def test():
print(lock)
inspect_serializability(test, name="test")
生成的输出是
=============================================================
Checking Serializability of <function test at 0x7ff130697e50>
=============================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
Detected 1 global variables. Checking serializability...
Serializing 'lock' <unlocked _thread.lock object at 0x7ff1306a9f30>...
!!! FAIL serialization: cannot pickle '_thread.lock' object
WARNING: Did not find non-serializable object in <unlocked _thread.lock object at 0x7ff1306a9f30>. This may be an oversight.
=============================================================
Variable:
FailTuple(lock [obj=<unlocked _thread.lock object at 0x7ff1306a9f30>, parent=<function test at 0x7ff130697e50>])
was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
=============================================================
Check https://docs.rayai.org.cn/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
=============================================================
要获得更详细的信息,请在导入 Ray 之前设置环境变量 `RAY_PICKLE_VERBOSE_DEBUG='2'`。这会启用使用基于 Python 的后端而不是 C-Pickle 的序列化,因此您可以在序列化过程中调试 Python 代码。然而,这会使序列化速度大大变慢。
已知问题#
在使用某些 python3.8 和 3.9 版本时,用户可能会遇到内存泄漏。这是由于Python 的 pickle 模块中的一个 bug。
此问题已在 Python 3.8.2rc1、Python 3.9.0 alpha 4 或更高版本中得到解决。