对象#
在 Ray 中,任务(tasks)和 Actor 会创建和计算对象。我们将这些对象称为远程对象,因为它们可以存储在 Ray 集群中的任何位置,我们使用对象引用(object refs)来引用它们。远程对象被缓存在 Ray 的分布式 共享内存对象存储(object store)中,集群中的每个节点都有一个对象存储。在集群环境中,一个远程对象可以存在于一个或多个节点上,与持有对象引用的数量无关。
对象引用本质上是一个指针或唯一 ID,用于引用远程对象而不暴露其值。如果您熟悉 Futures,Ray 的对象引用概念上与之类似。
对象引用可以通过两种方式创建。
它们是由远程函数调用返回的。
它们是由
ray.put()返回的。
import ray
# Put an object in Ray's object store.
y = 1
object_ref = ray.put(y)
// Put an object in Ray's object store.
int y = 1;
ObjectRef<Integer> objectRef = Ray.put(y);
// Put an object in Ray's object store.
int y = 1;
ray::ObjectRef<int> object_ref = ray::Put(y);
注意
远程对象是不可变的。也就是说,它们的值在创建后不能被修改。这使得远程对象可以在多个对象存储中进行复制,而无需同步这些副本。
获取对象数据#
您可以使用 ray.get() 方法从对象引用中获取远程对象的结果。如果当前节点的对象存储不包含该对象,则会下载该对象。
如果对象是 numpy 数组 或 numpy 数组的集合,则 get 调用是零拷贝的,并返回由共享对象存储内存支持的数组。否则,我们将反序列化对象数据为 Python 对象。
import ray
import time
# Get the value of one object ref.
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]
# You can also set a timeout to return early from a ``get``
# that's blocking for too long.
from ray.exceptions import GetTimeoutError
# ``GetTimeoutError`` is a subclass of ``TimeoutError``.
@ray.remote
def long_running_function():
time.sleep(8)
obj_ref = long_running_function.remote()
try:
ray.get(obj_ref, timeout=4)
except GetTimeoutError: # You can capture the standard "TimeoutError" instead
print("`get` timed out.")
`get` timed out.
// Get the value of one object ref.
ObjectRef<Integer> objRef = Ray.put(1);
Assert.assertTrue(objRef.get() == 1);
// You can also set a timeout(ms) to return early from a ``get`` that's blocking for too long.
Assert.assertTrue(objRef.get(1000) == 1);
// Get the values of multiple object refs in parallel.
List<ObjectRef<Integer>> objectRefs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
objectRefs.add(Ray.put(i));
}
List<Integer> results = Ray.get(objectRefs);
Assert.assertEquals(results, ImmutableList.of(0, 1, 2));
// Ray.get timeout example: Ray.get will throw an RayTimeoutException if time out.
public class MyRayApp {
public static int slowFunction() throws InterruptedException {
TimeUnit.SECONDS.sleep(10);
return 1;
}
}
Assert.assertThrows(RayTimeoutException.class,
() -> Ray.get(Ray.task(MyRayApp::slowFunction).remote(), 3000));
// Get the value of one object ref.
ray::ObjectRef<int> obj_ref = ray::Put(1);
assert(*obj_ref.Get() == 1);
// Get the values of multiple object refs in parallel.
std::vector<ray::ObjectRef<int>> obj_refs;
for (int i = 0; i < 3; i++) {
obj_refs.emplace_back(ray::Put(i));
}
auto results = ray::Get(obj_refs);
assert(results.size() == 3);
assert(*results[0] == 0);
assert(*results[1] == 1);
assert(*results[2] == 2);
传递对象参数#
Ray 对象引用可以在 Ray 应用程序中自由传递。这意味着它们可以作为任务、Actor 方法的参数传递,甚至可以存储在其他对象中。对象通过分布式引用计数进行跟踪,一旦对象的所有引用被删除,其数据就会被自动释放。
可以将对象传递给 Ray 任务或方法的两种不同方式。根据传递对象的方式,Ray 会决定是否在任务执行前解引用该对象。
将对象作为顶级参数传递:当对象直接作为顶级参数传递给任务时,Ray 会解引用该对象。这意味着 Ray 会获取所有顶级对象引用参数的底层数据,直到对象数据完全可用才会执行任务。
import ray
@ray.remote
def echo(a: int, b: int, c: int):
"""This function prints its input values to stdout."""
print(a, b, c)
# Passing the literal values (1, 2, 3) to `echo`.
echo.remote(1, 2, 3)
# -> prints "1 2 3"
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# Passing an object as a top-level argument to `echo`. Ray will de-reference top-level
# arguments, so `echo` will see the literal values (1, 2, 3) in this case as well.
echo.remote(a, b, c)
# -> prints "1 2 3"
将对象作为嵌套参数传递:当对象作为嵌套对象的一部分传递时,例如,在 Python 列表内,Ray 将不会解引用它。这意味着任务需要对该引用调用 ray.get() 来获取具体值。但是,如果任务从不调用 ray.get(),则对象值永远不需要传输到任务运行的机器上。我们建议在可能的情况下将对象作为顶级参数传递,但嵌套参数对于将对象传递给其他任务而无需查看数据很有用。
import ray
@ray.remote
def echo_and_get(x_list): # List[ObjectRef]
"""This function prints its input values to stdout."""
print("args:", x_list)
print("values:", ray.get(x_list))
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# Passing an object as a nested argument to `echo_and_get`. Ray does not
# de-reference nested args, so `echo_and_get` sees the references.
echo_and_get.remote([a, b, c])
# -> prints args: [ObjectRef(...), ObjectRef(...), ObjectRef(...)]
# values: [1, 2, 3]
顶级 vs 非顶级传递约定也适用于 Actor 构造函数和 Actor 方法调用。
@ray.remote
class Actor:
def __init__(self, arg):
pass
def method(self, arg):
pass
obj = ray.put(2)
# Examples of passing objects to actor constructors.
actor_handle = Actor.remote(obj) # by-value
actor_handle = Actor.remote([obj]) # by-reference
# Examples of passing objects to actor method calls.
actor_handle.method.remote(obj) # by-value
actor_handle.method.remote([obj]) # by-reference
对象的闭包捕获#
您还可以通过闭包捕获将对象传递给任务。当您有一个大型对象希望在多个任务或 Actor 之间原样共享,而不想反复将其作为参数传递时,这会很方便。但请注意,定义一个闭包捕获对象引用的任务将通过引用计数固定该对象,因此在作业完成之前该对象不会被逐出。
import ray
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
@ray.remote
def print_via_capture():
"""This function prints the values of (a, b, c) to stdout."""
print(ray.get([a, b, c]))
# Passing object references via closure-capture. Inside the `print_via_capture`
# function, the global object refs (a, b, c) can be retrieved and printed.
print_via_capture.remote()
# -> prints [1, 2, 3]
嵌套对象#
Ray 还支持嵌套对象引用。这允许您构建包含对更深层子对象的引用的复合对象。
# Objects can be nested within each other. Ray will keep the inner object
# alive via reference counting until all outer object references are deleted.
object_ref_2 = ray.put([object_ref])
Fault Tolerance#
Ray 可以通过 lineage reconstruction 自动从对象数据丢失中恢复,但不能从 owner 故障中恢复。有关更多详细信息,请参阅 Ray 容错。