模式:使用流水线(pipelining)提高吞吐量#

如果你有多个工作项,并且每个工作项都需要多个步骤才能完成,你可以使用流水线(pipelining)技术来提高集群利用率并增加系统的吞吐量。

注意

流水线是提高性能的重要技术,被 Ray 库广泛使用。参见Ray Data 作为示例。

../../_images/pipelining.svg

用例示例#

你的应用程序的某个组件既需要进行计算密集型工作,又需要与其他进程进行通信。理想情况下,你希望重叠计算和通信,以充分利用 CPU 并提高总体吞吐量。

代码示例#

import ray


@ray.remote
class WorkQueue:
    def __init__(self):
        self.queue = list(range(10))

    def get_work_item(self):
        if self.queue:
            return self.queue.pop(0)
        else:
            return None


@ray.remote
class WorkerWithoutPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_queue.get_work_item.remote())

            if work_item is None:
                break

            # Do work.
            self.process(work_item)


@ray.remote
class WorkerWithPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        self.work_item_ref = self.work_queue.get_work_item.remote()

        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_item_ref)

            if work_item is None:
                break

            self.work_item_ref = self.work_queue.get_work_item.remote()

            # Do work while we are fetching the next work item.
            self.process(work_item)


work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())

work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())

在上面的示例中,一个 worker actor 从队列中拉取工作项,然后对其进行一些计算。如果不使用流水线,我们在请求工作项后立即调用 ray.get(),因此在 RPC 进行过程中会阻塞,导致 CPU 闲置。而使用流水线时,我们在处理当前工作项之前预先请求下一个工作项,这样就可以在 RPC 进行过程中使用 CPU,从而提高 CPU 利用率。