新手指南#

Ray 提供了一个高度灵活、简洁易用的 API。在本页面,我们描述了一些技巧,可以帮助 Ray 的新用户避免一些常见的、可能严重影响程序性能的错误。有关高级设计模式的深入讲解,请阅读 核心设计模式

本文档中使用的核心 Ray API。#

API

描述

ray.init()

初始化 Ray 上下文。

@ray.remote

函数或类装饰器,指定函数将在另一个进程中执行为任务,或类将执行为 actor。
在另一个进程中作为任务执行或作为 actor 执行的函数或类。

.remote()

在每个远程函数、远程类声明或远程类方法的调用后添加的后缀。
调用远程类方法的后缀。
远程操作是异步的。

ray.put()

将对象存储在对象存储中,并返回其 ID。
此 ID 可用于将对象作为参数传递
给任何远程函数或方法调用。
这是一个同步操作。

ray.get()

从对象 ID 或对象 ID 列表返回对象或对象列表
列表。
这是一个同步(即阻塞)操作。

ray.wait()

从对象 ID 列表中,返回
(1)已就绪对象的 ID 列表,和
(2)尚未就绪对象的 ID 列表。
默认情况下,它一次返回一个就绪对象 ID。

本页面报告的所有结果均在配备 2.7 GHz Core i7 CPU 和 16GB RAM 的 13 英寸 MacBook Pro 上获得。虽然 ray.init() 在单台机器上运行时会自动检测 CPU 核心数,但为了减少您在运行以下代码时在您的机器上观察到的结果的可变性,我们在此指定 num_cpus = 4,即一台具有 4 个 CPU 的机器。

由于每个任务默认请求一个 CPU,因此此设置允许我们并行执行多达四个任务。因此,我们的 Ray 系统由一个执行程序的驱动程序组成,以及最多四个运行远程任务或 actor 的工作程序。

技巧 1:延迟 ray.get()#

使用 Ray,每次远程操作(例如,任务、actor 方法)的调用都是异步的。这意味着操作立即返回一个 Promise/Future,它本质上是操作结果的标识符(ID)。这是实现并行性的关键,因为它允许驱动程序并行启动多个操作。要获取实际结果,程序员需要调用 ID 上的 ray.get()。此调用会阻塞,直到结果可用。作为副作用,此操作还会阻止驱动程序调用其他操作,这会损害并行性。

不幸的是,Ray 新手很容易不经意间使用 ray.get()。为了说明这一点,请考虑以下简单的 Python 代码,该代码调用 do_some_work() 函数四次,每次调用大约需要 1 秒。

import ray
import time

def do_some_work(x):
    time.sleep(1) # Replace this with work you need to do.
    return x

start = time.time()
results = [do_some_work(x) for x in range(4)]
print("duration =", time.time() - start)
print("results =", results)

下面是程序执行的输出。不出所料,程序大约需要 4 秒。

duration = 4.0149290561676025
results = [0, 1, 2, 3]

现在,让我们使用 Ray 来并行化上面的程序。一些新手用户会通过简单地将函数设为远程来做到这一点,即:

import time
import ray

ray.init(num_cpus=4) # Specify this system has 4 CPUs.

@ray.remote
def do_some_work(x):
    time.sleep(1) # Replace this with work you need to do.
    return x

start = time.time()
results = [do_some_work.remote(x) for x in range(4)]
print("duration =", time.time() - start)
print("results =", results)

但是,执行上述程序时会得到:

duration = 0.0003619194030761719
results = [ObjectRef(df5a1a828c9685d3ffffffff0100000001000000), ObjectRef(cb230a572350ff44ffffffff0100000001000000), ObjectRef(7bbd90284b71e599ffffffff0100000001000000), ObjectRef(bd37d2621480fc7dffffffff0100000001000000)]

观察此输出,有两点引人注目。首先,程序立即完成,即不到 1 毫秒。其次,我们得到了一堆标识符,而不是预期的结果(即 [0, 1, 2, 3])。请记住,远程操作是异步的,它们返回 Future(即对象 ID),而不是结果本身。这正是我们在这里看到的。我们只测量调用任务所需的时间,而不是它们的运行时间,我们得到对应于四个任务的结果的 ID。

要获取实际结果,我们需要使用 ray.get(),此时第一个想法是直接在远程操作调用上调用 ray.get(),即用以下方式替换第 12 行:

results = [ray.get(do_some_work.remote(x)) for x in range(4)]

在该更改后重新运行程序,我们得到:

duration = 4.018050909042358
results =  [0, 1, 2, 3]

因此,结果现在是正确的,但仍然需要 4 秒,因此没有加速!这是怎么回事?细心的读者已经知道了答案:ray.get() 是阻塞的,因此在每次远程操作后调用它意味着我们等待该操作完成,这基本上意味着我们一次执行一个操作,因此没有并行性!

为了实现并行性,我们需要在调用所有任务后调用 ray.get()。我们可以很容易地在我们的示例中通过用以下方式替换第 12 行来实现:

results = ray.get([do_some_work.remote(x) for x in range(4)])

在此更改后重新运行程序,我们现在得到:

duration = 1.0064549446105957
results =  [0, 1, 2, 3]

终于成功了!我们的 Ray 程序现在只需 1 秒即可运行,这意味着所有 do_some_work() 的调用都在并行运行。

总之,请始终牢记 ray.get() 是一个阻塞操作,因此如果过早调用,它可能会损害并行性。相反,您应该尝试以这样一种方式编写程序,即 ray.get() 尽可能晚地被调用。

技巧 2:避免微小任务#

当首次开发者想要用 Ray 并行化其代码时,自然的想法是让每个函数或类都远程化。不幸的是,这可能导致不良后果;如果任务很小,Ray 程序可能比等效的 Python 程序花费更长的时间。

让我们再次考虑上面的例子,但这次我们将任务缩短(即,每个任务仅需 0.1 毫秒),并将任务调用次数大幅增加到 100,000。

import time

def tiny_work(x):
    time.sleep(0.0001) # Replace this with work you need to do.
    return x

start = time.time()
results = [tiny_work(x) for x in range(100000)]
print("duration =", time.time() - start)

运行此程序,我们得到:

duration = 13.36544418334961

这个结果应该是预料之中的,因为执行 100,000 个耗时 0.1 毫秒的任务的最低限度是 10 秒,我们还需要加上其他开销,例如函数调用等。

现在,让我们使用 Ray 并行化这段代码,方法是让 tiny_work() 的每次调用都远程执行:

import time
import ray

@ray.remote
def tiny_work(x):
    time.sleep(0.0001) # Replace this with work you need to do.
    return x

start = time.time()
result_ids = [tiny_work.remote(x) for x in range(100000)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

运行此代码的结果是:

duration = 27.46447515487671

令人惊讶的是,Ray 不仅没有提高执行时间,而且 Ray 程序实际上比顺序程序还要慢!怎么回事?问题在于,每次任务调用都有相当大的开销(例如,调度、进程间通信、更新系统状态),而这个开销占了任务实际执行时间的主导地位。

一种加快此程序速度的方法是增大远程任务的大小,以摊销调用开销。以下是其中一种可能的解决方案,我们将 1000 个 tiny_work() 函数调用聚合到一个更大的远程函数中:

import time
import ray

def tiny_work(x):
    time.sleep(0.0001) # replace this is with work you need to do
    return x

@ray.remote
def mega_work(start, end):
    return [tiny_work(x) for x in range(start, end)]

start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

现在,如果我们运行上面的程序,我们会得到:

duration = 3.2539820671081543

这大约是顺序执行的四分之一,符合我们的预期(回想一下,我们可以并行运行四个任务)。当然,自然的问题是,一个任务需要多大才能摊销远程调用开销。要找到答案的一种方法是运行以下简单程序来估算每个任务的调用开销:

@ray.remote
def no_work(x):
    return x

start = time.time()
num_calls = 1000
[ray.get(no_work.remote(x)) for x in range(num_calls)]
print("per task overhead (ms) =", (time.time() - start)*1000/num_calls)

在 2018 MacBook Pro 笔记本上运行上述程序显示:

per task overhead (ms) = 0.4739549160003662

换句话说,执行一个空任务几乎需要半毫秒。这表明我们需要确保一个任务至少需要几毫秒才能摊销调用开销。一个注意事项是,每个任务的开销在不同机器上会有所不同,以及在同一台机器上运行的任务与远程运行的任务之间也会有所不同。尽管如此,确保任务至少运行几毫秒是开发 Ray 程序的一个好经验法则。

技巧 3:避免将同一对象反复传递给远程任务#

当我们向远程函数传递一个大对象作为参数时,Ray 会在后台调用 ray.put() 将该对象存储在本地对象存储中。当远程任务在本地执行时,这可以显著提高远程任务调用的性能,因为所有本地任务共享对象存储。

然而,有些情况下,自动调用 ray.put() 来处理任务调用会导致性能问题。一个例子是反复传递同一个大对象,如下面的程序所示:

import time
import numpy as np
import ray

@ray.remote
def no_work(a):
    return

start = time.time()
a = np.zeros((5000, 5000))
result_ids = [no_work.remote(a) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

此程序输出:

duration = 1.0837509632110596

对于一个只调用 10 个空远程任务的程序来说,这个运行时间相当长。导致这种意想不到的高运行时间的原因是,每次调用 no_work(a) 时,Ray 都会调用 ray.put(a),这会导致将数组 a 复制到对象存储中。由于数组 a 有 250 万个条目,复制它需要相当长的时间。

为了避免每次调用 no_work() 时都复制数组 a,一个简单的解决方案是显式调用 ray.put(a),然后将 a 的 ID 传递给 no_work(),如下所示:

import time
import numpy as np
import ray

ray.init(num_cpus=4)

@ray.remote
def no_work(a):
    return

start = time.time()
a_id = ray.put(np.zeros((5000, 5000)))
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

运行此程序仅需要:

duration = 0.132796049118042

这比原始程序快 7 倍,这是可以预期的,因为调用 no_work(a) 的主要开销是将数组 a 复制到对象存储中,而现在这只发生一次。

避免将同一对象多次复制到对象存储的一个更重要的优势是,它可以防止对象存储过早填满并产生对象逐出的成本。

技巧 4:流水线式数据处理#

如果我们对多个任务的结果使用 ray.get(),我们将不得不等到最后一个任务完成。如果任务花费的时间差异很大,这可能会成为一个问题。

为了说明这个问题,请考虑以下示例,其中我们并行运行四个 do_some_work() 任务,每个任务需要 0 到 4 秒之间均匀分布的时间。接下来,假设这些任务的结果由 process_results() 处理,该函数每个结果需要 1 秒。预期的运行时间是(1)最慢的 do_some_work() 任务的执行时间,加上(2)4 秒,这是执行 process_results() 的时间。

import time
import random
import ray

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
    return x

def process_results(results):
    sum = 0
    for x in results:
        time.sleep(1) # Replace this with some processing code.
        sum += x
    return sum

start = time.time()
data_list = ray.get([do_some_work.remote(x) for x in range(4)])
sum = process_results(data_list)
print("duration =", time.time() - start, "\nresult = ", sum)

程序输出显示运行时间接近 8 秒:

duration = 7.82636022567749
result =  6

当其他任务可能早已完成时,等待最后一个任务完成会不必要地增加程序的运行时间。一个更好的解决方案是尽快处理数据。幸运的是,Ray 允许您通过在对象 ID 列表中调用 ray.wait() 来做到这一点。不指定任何其他参数,此函数将尽快返回其参数列表中的一个对象。此调用有两个返回值:(1)就绪对象的 ID,和(2)包含尚未就绪对象的 ID 的列表。修改后的程序如下。请注意,我们需要进行的一项更改是用 process_incremental() 替换 process_results(),该函数一次处理一个结果。

import time
import random
import ray

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
    return x

def process_incremental(sum, result):
    time.sleep(1) # Replace this with some processing code.
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))
print("duration =", time.time() - start, "\nresult = ", sum)

此程序现在仅需 4.8 秒多一点即可运行,这是一个显著的改进:

duration = 4.852453231811523
result =  6

为了便于理解,图 1 显示了两种情况下的执行时间线:一种是使用 ray.get() 在所有结果都可用后再进行处理,另一种是使用 ray.wait() 在结果可用后尽快进行处理。

../_images/pipeline.png

图 1:(a) 使用 ray.get() 等待 do_some_work() 任务的所有结果可用后再调用 process_results() 时的执行时间线。(b) 使用 ray.wait() 在结果可用后尽快处理时的执行时间线。#