Ray Core 入门教程(示例版)#
通过示例实现 Ray Core 中的一个函数,来理解 Ray 的工作原理及其基本概念。从经验较少的 Python 程序员到对高级任务感兴趣的程序员,都可以通过学习 Ray Core API,使用 Python 开始分布式计算。
安装 Ray#
使用以下命令安装 Ray
! pip install ray
Ray Core#
通过运行以下命令来启动本地集群
import ray
ray.init()
请注意输出中的以下几行
... INFO services.py:1263 -- View the Ray dashboard at http://127.0.0.1:8265
{'node_ip_address': '192.168.1.41',
...
'node_id': '...'}
这些消息表明 Ray 集群工作正常。在此示例输出中,Ray Dashboard 的地址是 http://127.0.0.1:8265。请在输出的第一行找到地址并访问 Ray Dashboard。Ray Dashboard 显示的信息包括可用的 CPU 核心数量以及当前 Ray 应用程序的总利用率。这是笔记本电脑的典型输出
{'CPU': 12.0,
'memory': 14203886388.0,
'node:127.0.0.1': 1.0,
'object_store_memory': 2147483648.0}
接下来,将简要介绍 Ray Core API,我们称之为 Ray API。Ray API 基于 Python 程序员熟悉的装饰器、函数和类等概念构建。它是分布式计算的通用编程接口。其底层引擎负责处理复杂的工作,使开发人员能够将 Ray 与现有的 Python 库和系统结合使用。
您的第一个 Ray API 示例#
以下函数从数据库中检索并处理数据。这个虚拟的 database 是一个普通的 Python 列表,其中包含《Learning Ray》一书标题的单词。 sleep 函数会暂停一段时间,以模拟访问和处理数据库数据的成本。
import time
database = [
"Learning", "Ray",
"Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]
def retrieve(item):
time.sleep(item / 10.)
return item, database[item]
如果索引为 5 的项需要半秒钟 (5 / 10.),那么顺序检索所有八项的总运行时间的估计是 (0+1+2+3+4+5+6+7)/10. = 2.8 秒。运行以下代码以获取实际时间
def print_runtime(input_data, start_time):
print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
print(*input_data, sep="\n")
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
在此示例中,运行函数的总时间为 2.82 秒,但您的计算机上可能有所不同。请注意,这个基础的 Python 版本无法同时运行该函数。
您可能会认为 Python 列表推导式更有效率。测得的 2.8 秒运行时间实际上是最坏的情况。尽管此程序大部分时间都在“休眠”,但由于全局解释器锁 (GIL) 的存在,它的速度仍然很慢。
Ray 任务#
此任务可以从并行化中获益。如果完美分布式,运行时间不应比最慢的子任务长太多,即 7/10. = 0.7 秒。要将此示例扩展到在 Ray 上并行运行,请先使用 @ray.remote 装饰器
import ray
@ray.remote
def retrieve_task(item):
return retrieve(item)
使用该装饰器后,函数 retrieve_task 就变成了一个 :ref:ray-remote-functions<Ray task>_。Ray 任务是 Ray 在与调用它的进程不同的进程中执行的函数,甚至可能在不同的机器上执行。
Ray 的使用非常方便,因为您可以继续编写 Python 代码,而无需大幅改变您的方法或编程风格。在 retrieve 函数上使用 :func:ray.remote()<@ray.remote> 装饰器是装饰器的预期用法,并且在此示例中您并未修改原始代码。
要检索数据库条目并衡量性能,您无需对代码进行太多更改。以下是该过程的概述
start = time.time()
object_references = [
retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
2022-12-20 13:52:34,632 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
并行运行任务需要进行两次小的代码修改。要远程执行您的 Ray 任务,必须使用 .remote() 调用。Ray 异步执行远程任务,即使在本地集群上也是如此。代码片段中 object_references 列表中的项并不直接包含结果。如果使用 type(object_references[0]) 检查第一个项的 Python 类型,您会发现它实际上是一个 ObjectRef。这些对象引用对应于您需要请求结果的 *futures*。 :func:ray.get()<ray.get(...)> 调用用于请求结果。每当您对 Ray 任务调用 remote 时,它会立即返回一个或多个对象引用。可以将 Ray 任务视为创建对象的主要方式。以下部分是一个将多个任务链接在一起的示例,并允许 Ray 在它们之间传递和解析对象。
让我们回顾一下之前的步骤。您从一个 Python 函数开始,然后用 @ray.remote 装饰它,使其成为一个 Ray 任务。您没有直接在代码中调用原始函数,而是对 Ray 任务调用了 .remote(...)。最后,您使用 .get(...) 从 Ray 集群中检索了结果。可以尝试将 Ray 任务从您自己的函数中创建作为一项额外的练习。
让我们回顾一下使用 Ray 任务获得的性能提升。在大多数笔记本电脑上,运行时间约为 0.71 秒,略高于最慢的子任务 0.7 秒。您可以通过利用 Ray 的更多 API 来进一步改进程序。
对象存储#
retrieve 定义直接访问 database 中的项。虽然这在本地 Ray 集群上运行良好,但请考虑它在实际集群(包含多台计算机)上如何工作。Ray 集群有一个带有驱动程序进程的 head 节点,以及多个带有执行任务的 worker 进程的 worker 节点。在这种情况下,数据库仅在驱动程序上定义,但 worker 进程需要访问它才能运行 retrieve 任务。Ray 共享驱动程序和 worker 之间或 worker 之间对象的方法是使用 ray.put 函数将数据放入 Ray 的分布式对象存储中。在 retrieve_task 定义中,您可以添加一个 db 参数,稍后将其作为 db_object_ref 对象传递。
db_object_ref = ray.put(database)
@ray.remote
def retrieve_task(item, db):
time.sleep(item / 10.)
return item, db[item]
通过使用对象存储,您可以让 Ray 管理整个集群中的数据访问。虽然对象存储会带来一些开销,但它提高了处理大型数据集的性能。对于真正的分布式环境来说,这一步至关重要。使用 retrieve_task 函数重新运行该示例,以确认其按预期执行。
非阻塞调用#
在上一节中,您使用了 ray.get(object_references) 来检索结果。此调用会阻塞驱动程序进程,直到所有结果都可用。如果每个数据库项需要几分钟才能处理,这种依赖关系可能会导致问题。如果允许驱动程序进程在等待结果的同时执行其他任务,并按完成顺序处理结果而不是等待所有项完成,则可以获得更高的效率。此外,如果由于数据库连接死锁等问题而无法检索某个数据库项,驱动程序将无限期挂起。为防止无限期挂起,请在使用 wait 函数时设置合理的 timeout 值。例如,如果您想等待的时间不到最慢数据检索任务时间的十倍,请使用 wait 函数在超过该时间后停止任务。
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []
while len(object_references) > 0:
finished, object_references = ray.wait(
object_references, timeout=7.0
)
data = ray.get(finished)
print_runtime(data, start)
all_data.extend(data)
print_runtime(all_data, start)
Runtime: 0.11 seconds, data:
(0, 'Learning')
(1, 'Ray')
Runtime: 0.31 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
Runtime: 0.51 seconds, data:
(4, 'Python')
(5, 'for')
Runtime: 0.71 seconds, data:
(6, 'Machine')
(7, 'Learning')
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
您可以在 while 循环中使用检索到的值来启动其他 worker 上的新任务,而不是打印结果。
任务依赖#
您可能希望对检索到的数据执行额外的处理任务。例如,使用第一个检索任务的结果从同一数据库(可能来自不同的表)查询其他相关数据。下面的代码设置了这个后续任务,并按顺序执行 retrieve_task 和 follow_up_task。
@ray.remote
def follow_up_task(retrieve_result):
original_item, _ = retrieve_result
follow_up_result = retrieve(original_item + 1)
return retrieve_result, follow_up_result
retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]
result = [print(data) for data in ray.get(follow_up_refs)]
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))
如果您不熟悉异步编程,这个示例可能不会特别令人印象深刻。然而,仔细一看,代码竟然能运行可能会让人感到惊讶。该代码看起来像一个普通的 Python 函数,带有几个列表推导式。
follow_up_task 的函数体期望其输入参数 retrieve_result 是一个 Python 元组。但是,当您使用 [follow_up_task.remote(ref) for ref in retrieve_refs] 命令时,您并没有将元组传递给后续任务。相反,您正在使用 retrieve_refs 来传入 Ray 对象引用。
在幕后,Ray 会识别出 follow_up_task 需要实际值,因此它会*自动*使用 ray.get 函数来解析这些 futures。此外,Ray 会为所有任务创建一个依赖图,并以尊重其依赖关系的方式执行它们。您无需显式地告诉 Ray 何时等待前一个任务完成——它会推断执行顺序。Ray 对象存储的这一特性很有用,因为通过将对象引用传递给下一个任务并让 Ray 处理其余部分,您可以避免将大型中间值复制回驱动程序。
一旦专门用于检索信息的任务完成,下一阶段的处理才会调度。事实上,如果 retrieve_refs 被命名为 retrieve_result,您可能不会注意到这个关键且故意的命名细微之处。Ray 允许您专注于您的工作,而不是集群计算的技术细节。两个任务的依赖图如下所示

Ray Actor#
此示例涵盖了 Ray Core 的另一个重要方面。直到这一步,一切基本上都是一个函数。您使用了 @ray.remote 装饰器来使某些函数远程化,但除此之外,您只使用了标准的 Python。
如果您想跟踪数据库被查询的次数,可以计算 retrieve 任务的结果。但是,是否有更有效的方法来做到这一点?理想情况下,您希望以一种能够处理大量数据的分布式方式进行跟踪。Ray 提供了 Actor 解决方案,Actor 在集群上运行有状态计算,并且还可以相互通信。与使用装饰函数创建 Ray 任务类似,您可以使用装饰的 Python 类来创建 Ray Actor。因此,您可以使用 Ray Actor 创建一个简单的计数器来跟踪数据库调用的次数。
@ray.remote
class DataTracker:
def __init__(self):
self._counts = 0
def increment(self):
self._counts += 1
def counts(self):
return self._counts
当您为 DataTracker 类添加 ray.remote 装饰器时,它就变成了一个 Actor。该 Actor 能够跟踪状态(例如计数),并且其方法是 Ray Actor 任务,您可以使用 .remote() 以与调用函数相同的方式调用它们。修改 retrieve_task 以整合此 Actor。
@ray.remote
def retrieve_tracker_task(item, tracker, db):
time.sleep(item / 10.)
tracker.increment.remote()
return item, db[item]
tracker = DataTracker.remote()
object_references = [
retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print(data)
print(ray.get(tracker.counts.remote()))
[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8
正如预期的那样,此计算的结果是 8。虽然您不需要 Actor 来执行此计算,但这演示了一种在整个集群中维护状态的方法,可能涉及多个任务。实际上,您可以将 Actor 传递给任何相关任务,甚至传递给另一个 Actor 的构造函数。Ray API 非常灵活,可以实现无限的可能性。分布式 Python 工具很少支持有状态计算,这对于运行复杂的分布式算法(如强化学习)特别有用。
总结#
在此示例中,您只使用了六个 API 方法。其中包括 ray.init() 来初始化集群,@ray.remote 将函数和类转换为任务和 Actor,ray.put() 将值传输到 Ray 的对象存储中,以及 ray.get() 从集群中检索对象。此外,您在 Actor 方法或任务上使用了 .remote() 来在集群上执行代码,并使用了 ray.wait 来防止阻塞调用。
Ray API 包含的不仅仅是这六个调用,但如果您刚开始接触,这六个调用就已经非常强大了。更普遍地总结,这些方法如下
ray.init():初始化您的 Ray 集群。传递一个地址以连接到现有集群。@ray.remote:将函数转换为任务,将类转换为 Actor。ray.put():将值放入 Ray 的对象存储中。ray.get():从对象存储中获取值。返回您放入其中的值,或由任务或 Actor 计算出的值。.remote():在您的 Ray 集群上运行 Actor 方法或任务,并用于实例化 Actor。ray.wait():返回两个对象引用列表,一个包含我们正在等待的已完成任务,另一个包含未完成的任务。
想了解更多?#
此示例是我们“Learning Ray》一书中 Ray Core 演练的简化版本。如果您喜欢它,请查看 Ray Core 示例库 或我们 用例库 中的一些 ML 工作负载。