模式:使用嵌套任务实现嵌套并行#
在此模式下,远程任务可以动态调用其他远程任务(包括其自身),以实现嵌套并行。当子任务可以并行化时,此模式非常有用。
但请记住,嵌套任务会带来其自身成本:额外的 worker 进程、调度开销、记账开销等。为了通过嵌套并行获得加速,请确保每个嵌套任务都执行了大量工作。有关更多详细信息,请参阅 反模式:任务粒度过细会导致过度并行,损害加速效果。
示例用例#
您想对一个大数字列表进行快速排序。通过使用嵌套任务,我们可以以分布式和并行的方式对列表进行排序。
任务树#
代码示例#
import ray
import time
from numpy import random
def partition(collection):
# Use the last element as the pivot
pivot = collection.pop()
greater, lesser = [], []
for element in collection:
if element > pivot:
greater.append(element)
else:
lesser.append(element)
return lesser, pivot, greater
def quick_sort(collection):
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort(lesser)
greater = quick_sort(greater)
return lesser + [pivot] + greater
@ray.remote
def quick_sort_distributed(collection):
# Tiny tasks are an antipattern.
# Thus, in our example we have a "magic number" to
# toggle when distributed recursion should be used vs
# when the sorting should be done in place. The rule
# of thumb is that the duration of an individual task
# should be at least 1 second.
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort_distributed.remote(lesser)
greater = quick_sort_distributed.remote(greater)
return ray.get(lesser) + [pivot] + ray.get(greater)
for size in [200000, 4000000, 8000000]:
print(f"Array size: {size}")
unsorted = random.randint(1000000, size=(size)).tolist()
s = time.time()
quick_sort(unsorted)
print(f"Sequential execution: {(time.time() - s):.3f}")
s = time.time()
ray.get(quick_sort_distributed.remote(unsorted))
print(f"Distributed execution: {(time.time() - s):.3f}")
print("--" * 10)
# Outputs:
# Array size: 200000
# Sequential execution: 0.040
# Distributed execution: 0.152
# --------------------
# Array size: 4000000
# Sequential execution: 6.161
# Distributed execution: 5.779
# --------------------
# Array size: 8000000
# Sequential execution: 15.459
# Distributed execution: 11.282
# --------------------
我们在两个 quick_sort_distributed
函数调用完成后才调用 ray.get()
。这可以最大限度地提高工作负载的并行性。有关更多详细信息,请参阅 反模式:在循环中调用 ray.get 会损害并行性。
请注意上面的执行时间,对于较小的任务,非分布式版本更快。然而,随着任务执行时间的增加,即因为要排序的列表更大,分布式版本更快。