Actors#

Actors 将 Ray API 从函数(任务)扩展到类。Actor 本质上是一个有状态的 worker(或服务)。当你实例化一个新的 Actor 时,Ray 会创建一个新的 worker,并将该 Actor 的方法调度到该特定 worker 上。这些方法可以访问和修改该 worker 的状态。

ray.remote 装饰器表明 Counter 类的实例是 Actor。每个 Actor 运行在自己的 Python 进程中。

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

# Create an actor from this class.
counter = Counter.remote()

Ray.actor 用于从常规 Java 类创建 Actor。

// A regular Java class.
public class Counter {

  private int value = 0;

  public int increment() {
    this.value += 1;
    return this.value;
  }
}

// Create an actor from this class.
// `Ray.actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s constructor
// as the argument.
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();

ray::Actor 用于从常规 C++ 类创建 Actor。

// A regular C++ class.
class Counter {

private:
    int value = 0;

public:
  int Increment() {
    value += 1;
    return value;
  }
};

// Factory function of Counter class.
static Counter *CreateCounter() {
    return new Counter();
};

RAY_REMOTE(&Counter::Increment, CreateCounter);

// Create an actor from this class.
// `ray::Actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s factory function
// as the argument.
auto counter = ray::Actor(CreateCounter).Remote();

使用 State API 中的 ray list actors 查看 Actor 状态

# This API is only available when you install Ray with `pip install "ray[default]"`.
ray list actors
======== List: 2023-05-25 10:10:50.095099 ========
Stats:
------------------------------
Total: 1

Table:
------------------------------
    ACTOR_ID                          CLASS_NAME    STATE      JOB_ID  NAME    NODE_ID                                                     PID  RAY_NAMESPACE
 0  9e783840250840f87328c9f201000000  Counter       ALIVE    01000000          13a475571662b784b4522847692893a823c78f1d3fd8fd32a2624923  38906  ef9de910-64fb-4575-8eb5-50573faa3ddf

指定所需的资源#

在 Actor 中指定资源需求。有关更多详细信息,请参阅 指定任务或 Actor 的资源需求

# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
    pass
// Specify required resources for an actor.
Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote();
// Specify required resources for an actor.
ray::Actor(CreateCounter).SetResource("CPU", 2.0).SetResource("GPU", 0.5).Remote();

调用 Actor#

你可以通过使用 remote 操作符调用 Actor 的方法来与之交互。然后,你可以对对象引用调用 get 来检索实际值。

# Call the actor.
obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
1
// Call the actor.
ObjectRef<Integer> objectRef = counter.task(&Counter::increment).remote();
Assert.assertTrue(objectRef.get() == 1);
// Call the actor.
auto object_ref = counter.Task(&Counter::increment).Remote();
assert(*object_ref.Get() == 1);

在不同 Actor 上调用的方法会并行执行,而在同一 Actor 上调用的方法会按调用顺序串行执行。同一 Actor 上的方法会像下面所示那样共享状态。

# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]
// Create ten Counter actors.
List<ActorHandle<Counter>> counters = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    counters.add(Ray.actor(Counter::new).remote());
}

// Increment each Counter once and get the results. These tasks all happen in
// parallel.
List<ObjectRef<Integer>> objectRefs = new ArrayList<>();
for (ActorHandle<Counter> counterActor : counters) {
    objectRefs.add(counterActor.task(Counter::increment).remote());
}
// prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
System.out.println(Ray.get(objectRefs));

// Increment the first Counter five times. These tasks are executed serially
// and share state.
objectRefs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    objectRefs.add(counters.get(0).task(Counter::increment).remote());
}
// prints [2, 3, 4, 5, 6]
System.out.println(Ray.get(objectRefs));
// Create ten Counter actors.
std::vector<ray::ActorHandle<Counter>> counters;
for (int i = 0; i < 10; i++) {
    counters.emplace_back(ray::Actor(CreateCounter).Remote());
}

// Increment each Counter once and get the results. These tasks all happen in
// parallel.
std::vector<ray::ObjectRef<int>> object_refs;
for (ray::ActorHandle<Counter> counter_actor : counters) {
    object_refs.emplace_back(counter_actor.Task(&Counter::Increment).Remote());
}
// prints 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
auto results = ray::Get(object_refs);
for (const auto &result : results) {
    std::cout << *result;
}

// Increment the first Counter five times. These tasks are executed serially
// and share state.
object_refs.clear();
for (int i = 0; i < 5; i++) {
    object_refs.emplace_back(counters[0].Task(&Counter::Increment).Remote());
}
// prints 2, 3, 4, 5, 6
results = ray::Get(object_refs);
for (const auto &result : results) {
    std::cout << *result;
}

传递 Actor 句柄#

你可以将 Actor 句柄传递给其他任务。你也可以定义使用 Actor 句柄的远程函数或 Actor 方法。

import time

@ray.remote
def f(counter):
    for _ in range(10):
        time.sleep(0.1)
        counter.increment.remote()
public static class MyRayApp {

  public static void foo(ActorHandle<Counter> counter) throws InterruptedException {
    for (int i = 0; i < 1000; i++) {
      TimeUnit.MILLISECONDS.sleep(100);
      counter.task(Counter::increment).remote();
    }
  }
}
void Foo(ray::ActorHandle<Counter> counter) {
    for (int i = 0; i < 1000; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        counter.Task(&Counter::Increment).Remote();
    }
}

如果你实例化了一个 Actor,你可以将该句柄传递给各种任务。

counter = Counter.remote()

# Start some tasks that use the actor.
[f.remote(counter) for _ in range(3)]

# Print the counter value.
for _ in range(10):
    time.sleep(0.1)
    print(ray.get(counter.get_counter.remote()))
0
3
8
10
15
18
20
25
30
30
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();

// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
  Ray.task(MyRayApp::foo, counter).remote();
}

// Print the counter value.
for (int i = 0; i < 10; i++) {
  TimeUnit.SECONDS.sleep(1);
  System.out.println(counter.task(Counter::getCounter).remote().get());
}
auto counter = ray::Actor(CreateCounter).Remote();

// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
  ray::Task(Foo).Remote(counter);
}

// Print the counter value.
for (int i = 0; i < 10; i++) {
  std::this_thread::sleep_for(std::chrono::seconds(1));
  std::cout << *counter.Task(&Counter::GetCounter).Remote().Get() << std::endl;
}

Actor 的类型提示和静态类型#

Ray 支持远程函数和 Actor 的 Python 类型提示,从而实现更好的 IDE 支持和静态类型检查。要获得最佳的类型推断并在使用 Actor 时通过类型检查器,请遵循以下模式:

  • 优先使用 ray.remote(MyClass) 而不是 @ray.remote 来定义 Actor:与其使用 @ray.remote 装饰你的类,不如使用 ActorClass = ray.remote(MyClass)。这保留了原始类类型,并允许类型检查器和 IDE 推断正确的类型。

  • 使用 @ray.method 来定义 Actor 方法:使用 @ray.method 装饰 Actor 方法,以启用对 Actor 句柄的远程方法调用的类型提示。

  • 使用 ActorClass ActorProxy 类型:当你实例化一个 Actor 时,将句柄注解为 ActorProxy[MyClass],以获取远程方法的类型提示。

示例

import ray
from ray.actor import ActorClass, ActorProxy

class Counter:
    def __init__(self):
        self.value = 0

    @ray.method
    def increment(self) -> int:
        self.value += 1
        return self.value

CounterActor: ActorClass[Counter] = ray.remote(Counter)
counter: ActorProxy[Counter] = CounterActor.remote()

# Type checkers and IDEs will now provide type hints for remote methods
obj_ref: ray.ObjectRef[int] = counter.increment.remote()
print(ray.get(obj_ref))

有关更多详细信息和高级模式,请参阅 Ray 中的类型提示

生成器#

Ray 与 Python 生成器语法兼容。有关更多详细信息,请参阅 Ray 生成器

取消 Actor 任务#

通过在返回的 ObjectRef 上调用 ray.cancel() 来取消 Actor 任务。

import ray
import asyncio
import time


@ray.remote
class Actor:
    async def f(self):
        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            print("Actor task canceled.")


actor = Actor.remote()
ref = actor.f.remote()

# Wait until task is scheduled.
time.sleep(1)
ray.cancel(ref)

try:
    ray.get(ref)
except ray.exceptions.RayTaskError:
    print("Object reference was cancelled.")

在 Ray 中,任务的取消行为取决于任务的当前状态。

未调度的任务:如果 Ray 尚未调度 Actor 任务,Ray 会尝试取消调度。当 Ray 在此阶段成功取消时,它会调用 ray.get(actor_task_ref),该调用会产生一个 TaskCancelledError

运行中的 Actor 任务(常规 Actor、线程 Actor):对于被归类为单线程 Actor 或多线程 Actor 的任务,Ray 没有提供中断机制。

运行中的异步 Actor 任务:对于被归类为 异步 Actor 的任务,Ray 会尝试取消关联的 asyncio.Task。这种取消方法符合 asyncio 任务取消 中提出的标准。请注意,如果你不在异步函数中 awaitasyncio.Task 在执行过程中不会被中断。

取消保证:Ray 会以“尽力而为”的方式尝试取消任务,这意味着取消并不总是得到保证。例如,如果取消请求未能到达执行器,任务可能不会被取消。你可以使用 ray.get(actor_task_ref) 检查任务是否已成功取消。

递归取消:Ray 会跟踪所有子任务和 Actor 任务。当给出 recursive=True 参数时,它会取消所有子任务和 Actor 任务。

Scheduling#

对于每个 Actor,Ray 会选择一个节点来运行它,并根据一些因素(如 Actor 的资源需求指定的调度策略)来做出调度决策。有关更多详细信息,请参阅 Ray 调度

Fault Tolerance#

默认情况下,Ray Actor 不会被 重启,当 Actor 意外崩溃时,Actor 任务也不会被重试。你可以通过在 ray.remote().options() 中设置 max_restartsmax_task_retries 选项来改变此行为。有关更多详细信息,请参阅 Ray 容错

常见问题解答:Actor、Worker 和资源#

Worker 和 Actor 有什么区别?

每个“Ray Worker”都是一个 Python 进程。

Ray 对任务和 Actor 的 Worker 处理方式不同。对于任务,Ray 使用“Ray Worker”来执行多个 Ray 任务。对于 Actor,Ray 将“Ray Worker”启动为一个专用的 Ray Actor。

  • 任务:当 Ray 在一台机器上启动时,会自动启动多个 Ray Worker(默认每个 CPU 一个)。Ray 使用它们来执行任务(类似于进程池)。如果你以 num_cpus=2 执行 8 个任务,并且 CPU 总数为 16(ray.cluster_resources()["CPU"] == 16),那么你将有 16 个 Worker 中的 8 个处于空闲状态。

  • Actor:Ray Actor 也是一个“Ray Worker”,但你可以在运行时使用 actor_cls.remote() 来实例化它。它的所有方法都在同一个进程上运行,使用 Ray 在定义 Actor 时指定的相同资源。请注意,与任务不同,Ray 不会重用运行 Ray Actor 的 Python 进程。当你删除 Actor 时,Ray 会终止它们。

为了最大限度地利用你的资源,你需要最大限度地提高 Worker 的工作时间。你还需要分配足够的集群资源,以便 Ray 能够运行你所有需要的 Actor 和你定义的任何其他任务。这也意味着 Ray 可以更灵活地调度任务,并且如果你不需要 Actor 的有状态部分,最好使用任务。

任务事件#

默认情况下,Ray 会跟踪 Actor 任务的执行情况,报告任务状态事件和 Ray Dashboard 及 State API 使用的剖析事件。

你可以通过在 ray.remote().options() 中将 enable_task_events 选项设置为 False 来禁用 Actor 的任务事件报告。此设置通过减少 Ray 发送到 Ray Dashboard 的数据量来降低任务执行的开销。

你也可以通过在 ray.remote().options() 中将 enable_task_events 选项设置为 False 来为某些 Actor 方法禁用任务事件报告。方法设置会覆盖 Actor 设置。

@ray.remote
class FooActor:

    # Disable task events reporting for this method.
    @ray.method(enable_task_events=False)
    def foo(self):
        pass


foo_actor = FooActor.remote()
ray.get(foo_actor.foo.remote())


关于 Ray Actor 的更多信息#