Actor 任务执行顺序#
同步单线程 Actor#
在 Ray 中,Actor 接收来自多个提交者(包括驱动程序和 worker)的任务。对于来自同一提交者的任务,如果 Actor 任务从未重试,同步单线程 Actor 将按照提交顺序执行它们。换句话说,给定任务不会执行,直到来自同一提交者之前提交的任务执行完毕。对于 max_task_retries
设置为非零值的 Actor,在任务重试发生时,任务执行顺序不保证。
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# For tasks from the same submitter,
# they are executed according to submission order.
value0 = counter.add.remote(1)
value1 = counter.add.remote(2)
# Output: 1. The first submitted task is executed first.
print(ray.get(value0))
# Output: 3. The later submitted task is executed later.
print(ray.get(value1))
1
3
然而,Actor 不保证来自不同提交者的任务执行顺序。例如,假设一个未满足的参数阻塞了之前提交的任务。在这种情况下,Actor 仍然可以执行由其他 worker 提交的任务。
import time
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# Submit task from a worker
@ray.remote
def submitter(value):
return ray.get(counter.add.remote(value))
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(5)
return value
# Submit tasks from different workers, with
# the first submitted task waiting for
# dependency resolution.
value0 = submitter.remote(delayed_resolution.remote(1))
value1 = submitter.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2
异步或多线程 Actor#
异步或多线程 Actor 不保证任务执行顺序。这意味着即使之前提交的任务仍在等待执行,系统也可能执行某个任务。
import time
import ray
@ray.remote
class AsyncCounter:
def __init__(self):
self.value = 0
async def add(self, addition):
self.value += addition
return self.value
counter = AsyncCounter.remote()
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(5)
return value
# Submit tasks from the driver, with
# the first submitted task waiting for
# dependency resolution.
value0 = counter.add.remote(delayed_resolution.remote(1))
value1 = counter.add.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2