Actors#
Actors 将 Ray API 从函数(任务)扩展到类。actor 本质上是一个有状态的工作进程(或服务)。当你实例化一个新的 actor 时,Ray 会创建一个新的工作进程,并将 actor 的方法调度到该特定工作进程上执行。这些方法可以访问和修改该工作进程的状态。
ray.remote
装饰器表明 Counter
类的实例是 actor。每个 actor 运行在各自独立的 Python 进程中。
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
# Create an actor from this class.
counter = Counter.remote()
Ray.actor
用于从常规 Java 类创建 actor。
// A regular Java class.
public class Counter {
private int value = 0;
public int increment() {
this.value += 1;
return this.value;
}
}
// Create an actor from this class.
// `Ray.actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s constructor
// as the argument.
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();
ray::Actor
用于从常规 C++ 类创建 actor。
// A regular C++ class.
class Counter {
private:
int value = 0;
public:
int Increment() {
value += 1;
return value;
}
};
// Factory function of Counter class.
static Counter *CreateCounter() {
return new Counter();
};
RAY_REMOTE(&Counter::Increment, CreateCounter);
// Create an actor from this class.
// `ray::Actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s factory function
// as the argument.
auto counter = ray::Actor(CreateCounter).Remote();
使用 ray list actors
命令(来自 State API)查看 actor 的状态
# This API is only available when you install Ray with `pip install "ray[default]"`.
ray list actors
======== List: 2023-05-25 10:10:50.095099 ========
Stats:
------------------------------
Total: 1
Table:
------------------------------
ACTOR_ID CLASS_NAME STATE JOB_ID NAME NODE_ID PID RAY_NAMESPACE
0 9e783840250840f87328c9f201000000 Counter ALIVE 01000000 13a475571662b784b4522847692893a823c78f1d3fd8fd32a2624923 38906 ef9de910-64fb-4575-8eb5-50573faa3ddf
指定所需资源#
在 actor 中指定资源需求。有关更多详细信息,请参阅指定任务或 actor 的资源需求。
# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
pass
// Specify required resources for an actor.
Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote();
// Specify required resources for an actor.
ray::Actor(CreateCounter).SetResource("CPU", 2.0).SetResource("GPU", 0.5).Remote();
调用 actor#
你可以使用 remote
操作符调用 actor 的方法来与其交互。然后,你可以对 object ref 调用 get
来检索实际值。
# Call the actor.
obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
1
// Call the actor.
ObjectRef<Integer> objectRef = counter.task(&Counter::increment).remote();
Assert.assertTrue(objectRef.get() == 1);
// Call the actor.
auto object_ref = counter.Task(&Counter::increment).Remote();
assert(*object_ref.Get() == 1);
在不同 actor 上调用的方法并行执行,而在同一 actor 上调用的方法按调用顺序串行执行。同一 actor 上的方法相互共享状态,如下所示。
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]
# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)
# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]
// Create ten Counter actors.
List<ActorHandle<Counter>> counters = new ArrayList<>();
for (int i = 0; i < 10; i++) {
counters.add(Ray.actor(Counter::new).remote());
}
// Increment each Counter once and get the results. These tasks all happen in
// parallel.
List<ObjectRef<Integer>> objectRefs = new ArrayList<>();
for (ActorHandle<Counter> counterActor : counters) {
objectRefs.add(counterActor.task(Counter::increment).remote());
}
// prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
System.out.println(Ray.get(objectRefs));
// Increment the first Counter five times. These tasks are executed serially
// and share state.
objectRefs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
objectRefs.add(counters.get(0).task(Counter::increment).remote());
}
// prints [2, 3, 4, 5, 6]
System.out.println(Ray.get(objectRefs));
// Create ten Counter actors.
std::vector<ray::ActorHandle<Counter>> counters;
for (int i = 0; i < 10; i++) {
counters.emplace_back(ray::Actor(CreateCounter).Remote());
}
// Increment each Counter once and get the results. These tasks all happen in
// parallel.
std::vector<ray::ObjectRef<int>> object_refs;
for (ray::ActorHandle<Counter> counter_actor : counters) {
object_refs.emplace_back(counter_actor.Task(&Counter::Increment).Remote());
}
// prints 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
auto results = ray::Get(object_refs);
for (const auto &result : results) {
std::cout << *result;
}
// Increment the first Counter five times. These tasks are executed serially
// and share state.
object_refs.clear();
for (int i = 0; i < 5; i++) {
object_refs.emplace_back(counters[0].Task(&Counter::Increment).Remote());
}
// prints 2, 3, 4, 5, 6
results = ray::Get(object_refs);
for (const auto &result : results) {
std::cout << *result;
}
传递 actor 句柄#
你可以将 actor 句柄传递给其他任务。你还可以定义使用 actor 句柄的远程函数或 actor 方法。
import time
@ray.remote
def f(counter):
for _ in range(10):
time.sleep(0.1)
counter.increment.remote()
public static class MyRayApp {
public static void foo(ActorHandle<Counter> counter) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
TimeUnit.MILLISECONDS.sleep(100);
counter.task(Counter::increment).remote();
}
}
}
void Foo(ray::ActorHandle<Counter> counter) {
for (int i = 0; i < 1000; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
counter.Task(&Counter::Increment).Remote();
}
}
如果你实例化一个 actor,可以将该句柄传递给各种任务。
counter = Counter.remote()
# Start some tasks that use the actor.
[f.remote(counter) for _ in range(3)]
# Print the counter value.
for _ in range(10):
time.sleep(0.1)
print(ray.get(counter.get_counter.remote()))
0
3
8
10
15
18
20
25
30
30
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();
// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
Ray.task(MyRayApp::foo, counter).remote();
}
// Print the counter value.
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(counter.task(Counter::getCounter).remote().get());
}
auto counter = ray::Actor(CreateCounter).Remote();
// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
ray::Task(Foo).Remote(counter);
}
// Print the counter value.
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << *counter.Task(&Counter::GetCounter).Remote().Get() << std::endl;
}
生成器#
Ray 与 Python 生成器语法兼容。有关更多详细信息,请参阅Ray 生成器。
取消 actor 任务#
通过对返回的 ObjectRef
调用 ray.cancel()
来取消 Actor 任务。
import ray
import asyncio
import time
@ray.remote
class Actor:
async def f(self):
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
print("Actor task canceled.")
actor = Actor.remote()
ref = actor.f.remote()
# Wait until task is scheduled.
time.sleep(1)
ray.cancel(ref)
try:
ray.get(ref)
except ray.exceptions.RayTaskError:
print("Object reference was cancelled.")
在 Ray 中,任务取消行为取决于任务的当前状态。
未调度的任务:如果 Ray 尚未调度 Actor 任务,Ray 会尝试取消调度。当 Ray 在此阶段成功取消时,它会调用 ray.get(actor_task_ref)
,这将产生一个 TaskCancelledError
。
正在运行的 actor 任务(常规 actor、线程 actor):对于归类为单线程 Actor 或多线程 Actor 的任务,Ray 不提供中断机制。
正在运行的异步 actor 任务:对于归类为 异步 Actors <_async-actors>
的任务,Ray 会尝试取消相关的 asyncio.Task
。这种取消方法与 asyncio 任务取消中提出的标准一致。请注意,如果你在异步函数中不使用 await
,asyncio.Task
不会在执行过程中中断。
取消保证:Ray 会尽最大努力尝试取消任务,这意味着取消并非总是能保证。例如,如果取消请求未能传达给执行器,任务可能不会被取消。你可以使用 ray.get(actor_task_ref)
检查任务是否已成功取消。
递归取消:Ray 会跟踪所有子任务和 Actor 任务。当给出 recursive=True
参数时,它会取消所有子任务和 Actor 任务。
调度#
对于每个 actor,Ray 会选择一个节点来运行它,并根据actor 的资源需求和指定的调度策略等几个因素做出调度决策。有关更多详细信息,请参阅Ray 调度。
故障容错#
默认情况下,当 Ray actor 意外崩溃时,不会重启,actor 任务也不会重试。你可以通过在 ray.remote()
和 .options()
中设置 max_restarts
和 max_task_retries
选项来改变此行为。有关更多详细信息,请参阅Ray 故障容错。
常见问题解答:Actors、Worker 和资源#
Worker 和 actor 有什么区别?
每个“Ray worker”都是一个 Python 进程。
Ray 对任务和 actor 的 worker 处理方式不同。对于任务,Ray 使用“Ray worker”执行多个 Ray 任务。对于 actor,Ray 将一个“Ray worker”作为专用的 Ray actor 启动。
任务:当 Ray 在机器上启动时,会自动启动一些 Ray worker(默认为每个 CPU 1 个)。Ray 使用它们来执行任务(类似于进程池)。如果你执行 8 个任务,每个任务需要
num_cpus=2
,并且总共有 16 个 CPU (ray.cluster_resources()["CPU"] == 16
),那么你最终会有 16 个 worker 中的 8 个空闲。Actor:Ray Actor 也是一个“Ray worker”,但你在运行时使用
actor_cls.remote()
实例化它。它的所有方法都在同一个进程上运行,使用 Ray 在你定义 Actor 时指定的相同资源。请注意,与任务不同,Ray 不会重用运行 Ray Actor 的 Python 进程。当删除 Actor 时,Ray 会终止这些进程。
为了最大限度地利用你的资源,你应该最大化 worker 的工作时间。你还需要分配足够的集群资源,以便 Ray 可以运行所有需要的 actor 以及你定义的任何其他任务。这也意味着 Ray 更灵活地调度任务,如果你不需要 actor 的有状态部分,最好使用任务。
任务事件#
默认情况下,Ray 会跟踪 actor 任务的执行,报告 Ray Dashboard 和 State API 使用的任务状态事件和性能分析事件。
你可以通过在 ray.remote()
和 .options()
中将 enable_task_events
选项设置为 False
来禁用 actor 的任务事件报告。此设置通过减少 Ray 发送给 Ray Dashboard 的数据量来降低任务执行的开销。
你还可以通过在 actor 方法的 ray.remote()
和 .options()
中将 enable_task_events
选项设置为 False
来禁用某些 actor 方法的任务事件报告。方法设置会覆盖 actor 的设置。
@ray.remote
class FooActor:
# Disable task events reporting for this method.
@ray.method(enable_task_events=False)
def foo(self):
pass
foo_actor = FooActor.remote()
ray.get(foo_actor.foo.remote())