使用 Ray 执行高度并行化任务#
虽然 Ray 可以用于非常复杂的并行化任务,但通常我们只是想简单地并行处理一些事情。例如,我们可能有 10 万个时间序列需要使用完全相同的算法进行处理,并且每个时间序列需要一分钟的处理时间。
很明显,在单个处理器上运行是不可行的:这将花费 70 天。即使我们设法在单台机器上使用 8 个处理器,处理时间也会缩短到 9 天。但如果我们能使用 8 台机器,每台机器有 16 个核心,大约 12 小时就能完成。
我们如何使用 Ray 来处理这类任务?
我们以计算圆周率的数字为例。算法很简单:生成随机的 x 和 y,如果 x^2 + y^2 < 1
,则它在圆内,我们将其计入。这实际上是 pi/4(回忆一下你的高中数学)。
以下代码(和本笔记本)假设您已经设置了 Ray 集群并正在头部节点上运行。有关如何设置 Ray 集群的更多详细信息,请参阅Ray 集群入门。
import ray
import random
import time
import math
from fractions import Fraction
# Let's start Ray
ray.init(address='auto')
我们使用 @ray.remote
装饰器来创建 Ray 任务。任务类似于函数,但结果是异步返回的。
它也可能不在本地机器上运行,而是在集群中的其他地方运行。这样您就可以并行运行多个任务,超出单台机器上处理器数量的限制。
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
要获取 future 的结果,我们使用 ray.get(),它会阻塞直到结果完成。
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count = SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
Running 1000000 tests took 1.4935967922210693 seconds
现在我们来看看我们的近似值有多好。
pi = pi4 * 4
float(pi)
3.143024
abs(pi-math.pi)/pi
0.0004554042254233261
嗯。有点偏差——勉强只有 4 位小数。为什么我们不多做 10 万倍呢?来做 1 千亿次!
FULL_SAMPLE_COUNT = 100 * 1000 * 1000 * 1000 # 100 billion samples!
BATCHES = int(FULL_SAMPLE_COUNT / SAMPLE_COUNT)
print(f'Doing {BATCHES} batches')
results = []
for _ in range(BATCHES):
results.append(pi4_sample.remote(sample_count = SAMPLE_COUNT))
output = ray.get(results)
Doing 100000 batches
请注意,在上面,我们生成了一个包含 10 万个 future 的列表。现在我们要做的就是等待结果。
根据您的 Ray 集群大小,这可能需要几分钟。但为了给您一些概念,如果我在单台机器上运行,我运行它时花了 0.4 秒。
在单个核心上,这意味着我们需要 0.4 * 100000 = 大约 11 小时。
以下是 Dashboard 的样子
所以现在,不再是单个核心处理这项任务,而是有 168 个核心共同处理。而且效率约为 80%。
pi = sum(output)*4/len(output)
float(pi)
3.14159518188
abs(pi-math.pi)/pi
8.047791203506436e-07
相当不错——我们的偏差只有百万分之一。