首次使用用户的小贴士#

Ray 提供了一个高度灵活、极简且易于使用的 API。在本页,我们将介绍一些小贴士,帮助首次使用 Ray 的用户避免一些可能显著损害程序性能的常见错误。有关高级设计模式的深入讨论,请阅读 核心设计模式

本文档中使用的核心 Ray API。#

API

描述

ray.init()

初始化 Ray 上下文。

@ray.remote

函数或类装饰器,指定函数将作为
任务执行,或类作为 Actor 在不同进程中执行。

.remote()

每个远程函数、远程类声明或
远程类方法调用的后缀。
远程操作是异步的。

ray.put()

将对象存储在对象存储中,并返回其 ID。
该 ID 可用于将对象作为参数传递
给任何远程函数或方法调用。
这是一个同步操作。

ray.get()

从对象 ID
或对象 ID 列表返回对象或对象列表。
这是一个同步(即阻塞)操作。

ray.wait()

从对象 ID 列表中,返回
(1) 已就绪对象的 ID 列表,以及
(2) 尚未就绪对象的 ID 列表。
默认情况下,它一次返回一个就绪的对象 ID。

本页报告的所有结果均在一台配备 2.7 GHz Core i7 CPU 和 16GB RAM 的 13 英寸 MacBook Pro 上获得。虽然 ray.init() 在单机运行时会自动检测核心数,为了减少您在运行以下代码时在您机器上观察到的结果的可变性,我们在此指定 num_cpus = 4,即一台拥有 4 个 CPU 的机器。

由于每个任务默认请求一个 CPU,此设置允许我们最多并行执行四个任务。因此,我们的 Ray 系统包含一个执行程序的 driver,以及最多四个运行远程任务或 Actor 的 worker。

小贴士 1:延迟调用 ray.get()#

使用 Ray,每个远程操作(例如,任务、Actor 方法)的调用都是异步的。这意味着操作会立即返回一个 promise/future,它本质上是操作结果的标识符 (ID)。这是实现并行性的关键,因为它允许 driver 程序并行启动多个操作。要获取实际结果,程序员需要对结果的 ID 调用 ray.get()。此调用会阻塞,直到结果可用为止。副作用是,此操作也会阻塞 driver 程序调用其他操作,这会损害并行性。

不幸的是,新的 Ray 用户很容易无意中错误使用 ray.get()。为了说明这一点,请看以下简单的 Python 代码,它调用函数 do_some_work() 四次,每次调用大约需要 1 秒

import ray
import time

def do_some_work(x):
    time.sleep(1) # Replace this with work you need to do.
    return x

start = time.time()
results = [do_some_work(x) for x in range(4)]
print("duration =", time.time() - start)
print("results =", results)

程序执行的输出如下。正如所料,程序耗时约 4 秒

duration = 4.0149290561676025
results = [0, 1, 2, 3]

现在,让我们使用 Ray 并行化上述程序。一些首次使用的用户可能会仅仅通过将函数设为远程来完成此操作,即:

import time
import ray

ray.init(num_cpus=4) # Specify this system has 4 CPUs.

@ray.remote
def do_some_work(x):
    time.sleep(1) # Replace this with work you need to do.
    return x

start = time.time()
results = [do_some_work.remote(x) for x in range(4)]
print("duration =", time.time() - start)
print("results =", results)

然而,执行上述程序后得到

duration = 0.0003619194030761719
results = [ObjectRef(df5a1a828c9685d3ffffffff0100000001000000), ObjectRef(cb230a572350ff44ffffffff0100000001000000), ObjectRef(7bbd90284b71e599ffffffff0100000001000000), ObjectRef(bd37d2621480fc7dffffffff0100000001000000)]

当我们查看此输出时,有两点很突出。首先,程序立即完成,即不到 1 毫秒。其次,我们没有得到期望的结果(即 [0, 1, 2, 3]),而是得到一堆标识符。回想一下,远程操作是异步的,它们返回的是 futures(即对象 ID)而不是结果本身。这正是我们在此看到的。我们只测量了调用任务所需的时间,而不是它们的运行时间,我们得到的是对应于这四个任务的结果的 ID。

要获得实际结果,我们需要使用 ray.get(),这里第一反应就是在远程操作调用上直接调用 ray.get(),即用以下内容替换第 12 行

results = [ray.get(do_some_work.remote(x)) for x in range(4)]

更改后重新运行程序,我们得到

duration = 4.018050909042358
results =  [0, 1, 2, 3]

现在结果是正确的了,但仍然需要 4 秒,所以没有加速!这是怎么回事?细心的读者已经知道答案了:ray.get() 是阻塞的,所以在每次远程操作后调用它意味着我们等待该操作完成,这实质上意味着我们一次执行一个操作,因此没有并行性!

要启用并行性,我们需要在调用所有任务之后再调用 ray.get()。在我们的示例中,我们可以通过用以下内容替换第 12 行来轻松实现:

results = ray.get([do_some_work.remote(x) for x in range(4)])

更改后重新运行程序,我们现在得到

duration = 1.0064549446105957
results =  [0, 1, 2, 3]

终于成功了!我们的 Ray 程序现在只需 1 秒即可运行完成,这意味着所有 do_some_work() 的调用都并行运行了。

总之,请始终记住 ray.get() 是一个阻塞操作,如果急切地调用它,可能会损害并行性。相反,您应该尝试编写程序,使得 ray.get() 被尽可能晚地调用。

小贴士 2:避免微小任务#

当首次开发者想要使用 Ray 并行化他们的代码时,很自然的本能是将每个函数或类都设为远程。不幸的是,这可能导致不良后果;如果任务非常小,Ray 程序可能会比等效的 Python 程序花费更长时间。

让我们再次考虑上面的示例,但这次我们将任务缩短很多(即每个只需 0.1 毫秒),并将任务调用次数显著增加到 100,000 次。

import time

def tiny_work(x):
    time.sleep(0.0001) # Replace this with work you need to do.
    return x

start = time.time()
results = [tiny_work(x) for x in range(100000)]
print("duration =", time.time() - start)

运行此程序后,我们得到

duration = 13.36544418334961

此结果是预期的,因为执行 100,000 个每个耗时 0.1 毫秒的任务的下限是 10 秒,此外我们还需要加上函数调用等其他开销。

现在,让我们使用 Ray 并行化此代码,将 tiny_work() 的每次调用都设为远程

import time
import ray

@ray.remote
def tiny_work(x):
    time.sleep(0.0001) # Replace this with work you need to do.
    return x

start = time.time()
result_ids = [tiny_work.remote(x) for x in range(100000)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

运行此代码的结果是

duration = 27.46447515487671

令人惊讶的是,Ray 不仅没有缩短执行时间,而且 Ray 程序实际上比顺序程序更慢!这是怎么回事?问题在于,每次任务调用都有不小的开销(例如,调度、进程间通信、更新系统状态),而这种开销超过了执行任务本身的实际时间。

加速此程序的一种方法是增大远程任务的粒度,以分摊调用开销。这里提供一种可能的解决方案,我们将 1000 个 tiny_work() 函数调用聚合到一个更大的远程函数中

import time
import ray

def tiny_work(x):
    time.sleep(0.0001) # replace this is with work you need to do
    return x

@ray.remote
def mega_work(start, end):
    return [tiny_work(x) for x in range(start, end)]

start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

现在,如果我们运行上述程序,我们得到

duration = 3.2539820671081543

这大约是顺序执行时间的四分之一,符合我们的预期(回想一下,我们可以并行运行四个任务)。当然,自然的问题是任务需要多大才能分摊远程调用开销。一种查找方法是运行以下简单程序来估计每个任务的调用开销

@ray.remote
def no_work(x):
    return x

start = time.time()
num_calls = 1000
[ray.get(no_work.remote(x)) for x in range(num_calls)]
print("per task overhead (ms) =", (time.time() - start)*1000/num_calls)

在 2018 年款 MacBook Pro 笔记本电脑上运行上述程序显示

per task overhead (ms) = 0.4739549160003662

换句话说,执行一个空任务大约需要半毫秒。这表明我们需要确保任务至少花费几毫秒,以分摊调用开销。需要注意的是,每个任务的开销会因机器而异,并且在同一机器上运行的任务与远程运行的任务之间也不同。话虽如此,在开发 Ray 程序时,确保任务至少耗时几毫秒是一个很好的经验法则。

小贴士 3:避免重复将同一对象传递给远程任务#

当我们向远程函数传递一个大对象作为参数时,Ray 会在底层调用 ray.put() 将该对象存储在本地对象存储中。当远程任务在本地执行时,这可以显著提高远程任务调用的性能,因为所有本地任务都共享对象存储。

然而,有些情况下,在任务调用时自动调用 ray.put() 会导致性能问题。一个例子是重复将同一个大对象作为参数传递,如下面的程序所示

import time
import numpy as np
import ray

@ray.remote
def no_work(a):
    return

start = time.time()
a = np.zeros((5000, 5000))
result_ids = [no_work.remote(a) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

此程序输出

duration = 1.0837509632110596

对于一个只调用 10 个不做任何事情的远程任务的程序来说,这个运行时间相当长。这种出乎意料的高运行时间的原因是,每次我们调用 no_work(a) 时,Ray 都会调用 ray.put(a),这会导致将数组 a 复制到对象存储中。由于数组 a 有 250 万个条目,复制它需要相当长的时间。

为了避免每次调用数组 a 时都复制 no_work(),一个简单的解决方案是显式调用 ray.put(a),然后将 a 的 ID 传递给 no_work(),如下所示

import time
import numpy as np
import ray

ray.init(num_cpus=4)

@ray.remote
def no_work(a):
    return

start = time.time()
a_id = ray.put(np.zeros((5000, 5000)))
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)

运行此程序仅需

duration = 0.132796049118042

这比原始程序快 7 倍,这是预期之中的,因为调用 no_work(a) 的主要开销是将数组 a 复制到对象存储,现在这只发生一次。

可以说,避免将同一对象的多个副本写入对象存储的一个更重要优势在于,它可以防止对象存储过早填满并产生对象驱逐的开销。

小贴士 4:流水线处理数据#

如果我们对多个任务的结果使用 ray.get(),我们将不得不等到这些任务中的最后一个完成。如果任务耗时差异很大,这可能会成为一个问题。

为了说明这个问题,考虑以下示例,我们并行运行四个 do_some_work() 任务,每个任务耗时均匀分布在 0 到 4 秒之间。接下来,假设这些任务的结果由 process_results() 处理,每个结果需要 1 秒。那么预期的运行时间是 (1) 执行最慢的 do_some_work() 任务所需的时间,加上 (2) 执行 process_results() 所需的 4 秒。

import time
import random
import ray

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
    return x

def process_results(results):
    sum = 0
    for x in results:
        time.sleep(1) # Replace this with some processing code.
        sum += x
    return sum

start = time.time()
data_list = ray.get([do_some_work.remote(x) for x in range(4)])
sum = process_results(data_list)
print("duration =", time.time() - start, "\nresult = ", sum)

程序输出显示运行时间接近 8 秒

duration = 7.82636022567749
result =  6

当其他任务可能早就完成了,等待最后一个任务完成会不必要地增加程序的运行时间。更好的解决方案是数据一可用就立即处理。幸运的是,Ray 允许您通过对对象 ID 列表调用 ray.wait() 来做到这一点。如果不指定任何其他参数,此函数会在其参数列表中的某个对象就绪时立即返回。此调用有两个返回值:(1) 已就绪对象的 ID,以及 (2) 包含尚未就绪对象 ID 的列表。修改后的程序如下所示。请注意,我们需要进行的一项更改是用 process_incremental() 替换 process_results(),后者一次处理一个结果。

import time
import random
import ray

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
    return x

def process_incremental(sum, result):
    time.sleep(1) # Replace this with some processing code.
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))
print("duration =", time.time() - start, "\nresult = ", sum)

此程序现在只需略多于 4.8 秒,这是一个显著的改进

duration = 4.852453231811523
result =  6

为了便于理解,图 1 显示了两种情况下的执行时间线:使用 ray.get() 等待所有结果可用后才处理,以及使用 ray.wait() 在结果一可用就立即开始处理。

../_images/pipeline.png

图 1:(a) 使用 ray.get() 等待所有 do_some_work() 任务结果就绪后才调用 process_results() 的执行时间线。(b) 使用 ray.wait() 在结果一可用就立即处理的执行时间线。#