实用工具类#

Actor 池#

模块 ray.util 包含一个实用工具类 ActorPool。此类与 multiprocessing.Pool 类似,可让您在固定的 actor 池上调度 Ray 任务。

import ray
from ray.util import ActorPool


@ray.remote
class Actor:
    def double(self, n):
        return n * 2


a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])

# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
# [2, 4, 6, 8]

有关更多信息,请参阅包参考

Actor 池尚未在 Java 中实现。

Actor 池尚未在 C++ 中实现。

使用 Ray Queue 进行消息传递#

有时仅使用一个信号进行同步是不够的。如果您需要在许多任务或 actor 之间发送数据,可以使用 ray.util.queue.Queue

import ray
from ray.util.queue import Queue, Empty

ray.init()
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)


@ray.remote
def consumer(id, queue):
    try:
        while True:
            next_item = queue.get(block=True, timeout=1)
            print(f"consumer {id} got work {next_item}")
    except Empty:
        pass


[queue.put(i) for i in range(10)]
print("Put work 1 - 10 to queue...")

consumers = [consumer.remote(id, queue) for id in range(2)]
ray.get(consumers)

Ray 的 Queue API 与 Python 的 asyncio.Queuequeue.Queue 具有相似的 API。