模式:使用流水线(pipelining)提高吞吐量#
如果你有多个工作项,并且每个工作项都需要多个步骤才能完成,你可以使用流水线(pipelining)技术来提高集群利用率并增加系统的吞吐量。
注意
流水线是提高性能的重要技术,被 Ray 库广泛使用。参见Ray Data 作为示例。
用例示例#
你的应用程序的某个组件既需要进行计算密集型工作,又需要与其他进程进行通信。理想情况下,你希望重叠计算和通信,以充分利用 CPU 并提高总体吞吐量。
代码示例#
import ray
@ray.remote
class WorkQueue:
def __init__(self):
self.queue = list(range(10))
def get_work_item(self):
if self.queue:
return self.queue.pop(0)
else:
return None
@ray.remote
class WorkerWithoutPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_queue.get_work_item.remote())
if work_item is None:
break
# Do work.
self.process(work_item)
@ray.remote
class WorkerWithPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
self.work_item_ref = self.work_queue.get_work_item.remote()
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_item_ref)
if work_item is None:
break
self.work_item_ref = self.work_queue.get_work_item.remote()
# Do work while we are fetching the next work item.
self.process(work_item)
work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())
work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())
在上面的示例中,一个 worker actor 从队列中拉取工作项,然后对其进行一些计算。如果不使用流水线,我们在请求工作项后立即调用 ray.get()
,因此在 RPC 进行过程中会阻塞,导致 CPU 闲置。而使用流水线时,我们在处理当前工作项之前预先请求下一个工作项,这样就可以在 RPC 进行过程中使用 CPU,从而提高 CPU 利用率。