Actor 任务执行顺序#

同步、单线程 Actor#

在 Ray 中,Actor 会接收来自多个提交者(包括驱动程序和工作节点)的任务。对于来自同一提交者的任务,同步、单线程 Actor 会按照提交的顺序执行它们,除非您设置了 allow_out_of_order_execution,或者 Ray 重试任务。换句话说,在同一提交者之前提交的任务完成执行之前,某个给定的任务不会被执行。对于设置了 max_task_retries 为非零值的 Actor,当发生任务重试时,任务执行顺序不被保证。

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def add(self, addition):
        self.value += addition
        return self.value

counter = Counter.remote()

# For tasks from the same submitter,
# they are executed according to submission order.
value0 = counter.add.remote(1)
value1 = counter.add.remote(2)

# Output: 1. The first submitted task is executed first.
print(ray.get(value0))
# Output: 3. The later submitted task is executed later.
print(ray.get(value1))
1
3

但是,Actor 不保证来自不同提交者的任务的执行顺序。例如,假设一个未满足的参数阻塞了之前提交的任务。在这种情况下,Actor 仍然可以执行由不同工作节点提交的任务。

import time
import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def add(self, addition):
        self.value += addition
        return self.value

counter = Counter.remote()

# Submit task from a worker
@ray.remote
def submitter(value):
    return ray.get(counter.add.remote(value))

# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
    time.sleep(1)
    return value

# Submit tasks from different workers, with
# the first submitted task waiting for
# dependency resolution.
value0 = submitter.remote(delayed_resolution.remote(1))
value1 = submitter.remote(2)

# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2

异步或多线程 Actor#

异步或多线程 Actor 不保证任务的执行顺序。这意味着系统可能会执行一个任务,即使之前提交的任务还在等待执行。

import time
import ray

@ray.remote
class AsyncCounter:
    def __init__(self):
        self.value = 0

    async def add(self, addition):
        self.value += addition
        return self.value

counter = AsyncCounter.remote()

# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
    time.sleep(1)
    return value

# Submit tasks from the driver, with
# the first submitted task waiting for
# dependency resolution.
value0 = counter.add.remote(delayed_resolution.remote(1))
value1 = counter.add.remote(2)

# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2