反模式:过度并行化过细粒度的任务会损害加速效果#

TLDR: 避免过度并行化。并行化任务的开销比使用普通函数要高。

并行化或分布式执行任务通常比普通函数调用带来更高的开销。因此,如果你并行化一个执行速度非常快的函数,其开销可能比实际函数调用花费更长时间!

为了解决这个问题,我们应该谨慎对待过度并行化。如果你的函数或任务太小,你可以使用一种称为批量处理 (batching) 的技术,让你的任务在一次调用中完成更多有意义的工作。

代码示例#

反模式

import ray
import time
import itertools

ray.init()

numbers = list(range(10000))


def double(number):
    time.sleep(0.00001)
    return number * 2


start_time = time.time()
serial_doubled_numbers = [double(number) for number in numbers]
end_time = time.time()
print(f"Ordinary function call takes {end_time - start_time} seconds")
# Ordinary function call takes 0.16506004333496094 seconds


@ray.remote
def remote_double(number):
    return double(number)


start_time = time.time()
doubled_number_refs = [remote_double.remote(number) for number in numbers]
parallel_doubled_numbers = ray.get(doubled_number_refs)
end_time = time.time()
print(f"Parallelizing tasks takes {end_time - start_time} seconds")
# Parallelizing tasks takes 1.6061789989471436 seconds

更好的方法:使用批量处理。

@ray.remote
def remote_double_batch(numbers):
    return [double(number) for number in numbers]


BATCH_SIZE = 1000
start_time = time.time()
doubled_batch_refs = []
for i in range(0, len(numbers), BATCH_SIZE):
    batch = numbers[i : i + BATCH_SIZE]
    doubled_batch_refs.append(remote_double_batch.remote(batch))
parallel_doubled_numbers_with_batching = list(
    itertools.chain(*ray.get(doubled_batch_refs))
)
end_time = time.time()
print(f"Parallelizing tasks with batching takes {end_time - start_time} seconds")
# Parallelizing tasks with batching takes 0.030150890350341797 seconds

从上面的示例中可以看出,过度并行化会带来更高的开销,并且程序运行速度比串行版本慢。通过采用适当的批量大小进行批量处理,我们可以分摊开销并获得预期的加速效果。