使用 Ray 进行高度可并行化的任务#

try-anyscale-quickstart

虽然 Ray 可用于非常复杂的并行化任务,但我们通常只想简单地并行执行某项操作。例如,我们可能有 100,000 个时间序列需要使用完全相同的算法进行处理,而每个时间序列需要一分钟的处理时间。

显然,在单个处理器上运行是不现实的:这将花费 70 天。即使我们在单台机器上使用 8 个处理器,也需要 9 天。但如果我们能使用 8 台机器,每台机器有 16 个核心,那么大约 12 小时就能完成。

我们如何使用 Ray 来处理这类任务?

我们以计算 pi 的位数这个简单示例为例。算法很简单:生成随机的 x 和 y,如果 x^2 + y^2 < 1,那么它就在圆内,我们将其计数为 1。这实际上等于 pi/4(还记得你的高中数学吗)。

下面的代码(以及这个笔记本)假设您已经设置了 Ray 集群,并且您正在头节点上运行。有关如何设置 Ray 集群的更多详细信息,请参阅 Ray 集群入门

import ray
import random
import time
import math
from fractions import Fraction
# Let's start Ray
ray.init(address='auto')

我们使用 @ray.remote 装饰器来创建 Ray 任务。任务就像一个函数,但结果是异步返回的。

它也可能不在本地机器上运行,它可能在集群的其他地方运行。这样,您可以并行运行多个任务,不受单机处理器数量的限制。

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the 
    fraction of time it was inside the circle. 
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

要获取未来结果,我们使用 ray.get(),它会阻塞直到结果完成。

SAMPLE_COUNT = 1000 * 1000
start = time.time() 
future = pi4_sample.remote(sample_count = SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
Running 1000000 tests took 1.4935967922210693 seconds

现在让我们看看我们的近似值有多好。

pi = pi4 * 4
float(pi)
3.143024
abs(pi-math.pi)/pi
0.0004554042254233261

呃。有点偏离——只有 4 位小数。为什么我们不尝试 100,000 倍的计算量呢?让我们进行 1000 亿次计算!

FULL_SAMPLE_COUNT = 100 * 1000 * 1000 * 1000 # 100 billion samples! 
BATCHES = int(FULL_SAMPLE_COUNT / SAMPLE_COUNT)
print(f'Doing {BATCHES} batches')
results = []
for _ in range(BATCHES):
    results.append(pi4_sample.remote(sample_count = SAMPLE_COUNT))
output = ray.get(results)
Doing 100000 batches

请注意,在上面的代码中,我们生成了一个包含 100,000 个 futures 的列表。现在我们只需要等待结果。

根据您的 Ray 集群的大小,这可能需要几分钟。但为了给您一个概念,如果我们在一台机器上进行计算,当我运行它时,花费了 0.4 秒。

在单核上,这意味着我们大约需要 0.4 * 100000 = 约 11 小时。

这是仪表板的样子

View of the dashboard

所以现在,而不是只有一个核心在处理这项任务,我有 168 个核心一起工作。效率约为 80%。

pi = sum(output)*4/len(output)
float(pi)
3.14159518188
abs(pi-math.pi)/pi
8.047791203506436e-07

一点也不差——我们的误差只有百万分之一。