Pattern: 使用流水线提高吞吐量#
如果您有多个工作项,并且每个工作项都需要几个步骤才能完成,那么您可以使用流水线技术来提高集群利用率并增加系统的吞吐量。
注意
流水线是提高性能的一项重要技术,并且被 Ray 库大量使用。例如,请参阅Ray Data。
用例#
您应用程序的某个组件需要同时进行计算密集型工作和与其他进程通信。理想情况下,您希望重叠计算和通信,以饱和 CPU 并提高整体吞吐量。
代码示例#
import ray
@ray.remote
class WorkQueue:
def __init__(self):
self.queue = list(range(10))
def get_work_item(self):
if self.queue:
return self.queue.pop(0)
else:
return None
@ray.remote
class WorkerWithoutPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_queue.get_work_item.remote())
if work_item is None:
break
# Do work.
self.process(work_item)
@ray.remote
class WorkerWithPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
self.work_item_ref = self.work_queue.get_work_item.remote()
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_item_ref)
if work_item is None:
break
self.work_item_ref = self.work_queue.get_work_item.remote()
# Do work while we are fetching the next work item.
self.process(work_item)
work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())
work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())
在上面的示例中,一个工作 actor 从队列中拉取工作,然后对其进行一些计算。如果没有流水线,我们在请求工作项后立即调用ray.get(),因此在 RPC 进行过程中会阻塞,导致 CPU 空闲。有了流水线,我们会在处理当前工作项之前预先请求下一个工作项,因此可以在 RPC 进行过程中使用 CPU,从而提高 CPU 利用率。