Ray Core 入门示例#
在 Ray Core 中实现一个函数,以理解 Ray 的工作原理及其基本概念。无论是经验较少的 Python 程序员,还是对高级任务感兴趣的程序员,都可以通过学习 Ray Core API 来开始使用 Python 进行分布式计算。
安装 Ray#
使用以下命令安装 Ray
! pip install ray
Ray Core#
运行以下命令启动本地集群
import ray
ray.init()
注意输出中的以下行
... INFO services.py:1263 -- View the Ray dashboard at http://127.0.0.1:8265
{'node_ip_address': '192.168.1.41',
...
'node_id': '...'}
这些消息表明 Ray 集群正在按预期工作。在此示例输出中,Ray dashboard 的地址是 http://127.0.0.1:8265
。请通过你输出的第一行地址访问 Ray dashboard。Ray dashboard 会显示诸如可用 CPU 核数以及当前 Ray 应用的总利用率等信息。这是笔记本电脑上的典型输出
{'CPU': 12.0,
'memory': 14203886388.0,
'node:127.0.0.1': 1.0,
'object_store_memory': 2147483648.0}
接下来是对 Ray Core API(我们称之为 Ray API)的简要介绍。Ray API 基于 Python 程序员熟悉的装饰器、函数和类等概念。它是一个用于分布式计算的通用编程接口。引擎负责处理复杂的工作,让开发者能够将 Ray 与现有的 Python 库和系统一起使用。
你的第一个 Ray API 示例#
以下函数从数据库检索和处理数据。虚拟 database
是一个简单的 Python 列表,包含图书 “Learning Ray” 的标题中的词语。sleep
函数会暂停一定时间,以模拟从数据库访问和处理数据的开销。
import time
database = [
"Learning", "Ray",
"Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]
def retrieve(item):
time.sleep(item / 10.)
return item, database[item]
如果索引为 5 的项花费了半秒钟(5 / 10.
),则顺序检索所有八个项的总运行时长估计为 (0+1+2+3+4+5+6+7)/10. = 2.8
秒。运行以下代码获取实际时间
def print_runtime(input_data, start_time):
print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
print(*input_data, sep="\n")
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
在此示例中,运行此函数的总时间为 2.82 秒,但你计算机上的时间可能会有所不同。请注意,这个基本的 Python 版本不能同时运行该函数。
你可能认为 Python 列表推导式更高效。测得的 2.8 秒运行时长实际上是最坏情况。尽管该程序大部分时间都在“休眠”,但由于全局解释器锁(GIL),它还是很慢。
Ray Tasks(任务)#
这个任务可以受益于并行化。如果它完美地分布式,运行时长应该不会比最慢的子任务长很多,即 7/10. = 0.7
秒。要扩展此示例以在 Ray 上并行运行,请首先使用 @ray.remote
装饰器
import ray
@ray.remote
def retrieve_task(item):
return retrieve(item)
使用装饰器,函数 retrieve_task
变成了 :ref:ray-remote-functions<Ray 任务>
_。Ray 任务是 Ray 在与调用它的进程不同的进程(可能在不同的机器上)执行的函数。
Ray 使用起来很方便,因为你可以继续编写 Python 代码,而无需大幅改变你的方法或编程风格。在这个例子中,在 retrieve 函数上使用 :func:ray.remote()<@ray.remote>
装饰器是装饰器的预期用法,你也没有修改原始代码。
要检索数据库条目并衡量性能,你无需对代码进行太多更改。以下是该过程的概述
start = time.time()
object_references = [
retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
2022-12-20 13:52:34,632 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
并行运行任务需要对代码进行两次微小的修改。要远程执行 Ray 任务,你必须使用 .remote()
调用。Ray 即使在本地集群上也会异步执行远程任务。代码片段中的 object_references
列表中的项不直接包含结果。如果你使用 type(object_references[0])
检查第一项的 Python 类型,你会发现它实际上是一个 ObjectRef
。这些对象引用对应于需要请求结果的 *futures*。调用 :func:ray.get()<ray.get(...)>
用于请求结果。无论何时你在 Ray 任务上调用 remote,它都会立即返回一个或多个对象引用。将 Ray 任务视为创建对象的主要方式。下一节是一个示例,它将多个任务链接在一起,并允许 Ray 在它们之间传递和解析对象。
我们回顾一下前面的步骤。你从一个 Python 函数开始,然后用 @ray.remote
装饰它,使该函数成为一个 Ray 任务。代码中你没有直接调用原始函数,而是对 Ray 任务调用了 .remote(...)
。最后,你使用 .get(...)
从 Ray 集群中检索结果。考虑将你的一个函数创建为 Ray 任务作为一个额外的练习。
我们回顾一下使用 Ray 任务带来的性能提升。在大多数笔记本电脑上,运行时长约为 0.71 秒,略长于最慢的子任务(0.7 秒)。你可以通过利用更多 Ray 的 API 来进一步改进程序。
对象存储#
retrieve 的定义直接访问 database
中的项。这在本地 Ray 集群上运行良好,但请考虑它在由多台计算机组成的实际集群上如何工作。Ray 集群有一个带有 driver 进程的头节点和多个带有 worker 进程执行任务的 worker 节点。在这种情况下,数据库只在 driver 上定义,但 worker 进程需要访问它来运行 retrieve 任务。Ray 解决 driver 和 worker 之间或 worker 之间共享对象的方法是使用 ray.put
函数将数据放入 Ray 的分布式对象存储中。在 retrieve_task
定义中,你可以添加一个 db
参数,稍后作为 db_object_ref
对象传递。
db_object_ref = ray.put(database)
@ray.remote
def retrieve_task(item, db):
time.sleep(item / 10.)
return item, db[item]
通过使用对象存储,你允许 Ray 在整个集群中管理数据访问。尽管对象存储会带来一些开销,但对于大型数据集,它可以提高性能。这一步对于真正的分布式环境至关重要。重新运行带有 retrieve_task
函数的示例,以确认它按预期执行。
非阻塞调用#
在上一节中,你使用 ray.get(object_references)
来检索结果。此调用会阻塞 driver 进程,直到所有结果都可用。如果每个数据库项需要几分钟才能处理,这种依赖关系可能会导致问题。如果你允许 driver 进程在等待结果时执行其他任务,并在结果完成后立即处理它们,而不是等待所有项完成,则可以获得更高的效率。此外,如果由于数据库连接死锁等问题导致某个数据库项无法检索,driver 将无限期地挂起。为防止无限期挂起,在使用 wait
函数时,请设置合理的 timeout
值。例如,如果你希望等待时间少于最慢数据检索任务时间的十倍,请使用 wait
函数在该时间过后停止任务。
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []
while len(object_references) > 0:
finished, object_references = ray.wait(
object_references, timeout=7.0
)
data = ray.get(finished)
print_runtime(data, start)
all_data.extend(data)
print_runtime(all_data, start)
Runtime: 0.11 seconds, data:
(0, 'Learning')
(1, 'Ray')
Runtime: 0.31 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
Runtime: 0.51 seconds, data:
(4, 'Python')
(5, 'for')
Runtime: 0.71 seconds, data:
(6, 'Machine')
(7, 'Learning')
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
你可以在 while
循环中使用检索到的值来在其他 worker 上启动新任务,而不是打印结果。
任务依赖#
你可能想对检索到的数据执行额外的处理任务。例如,使用第一个检索任务的结果从同一数据库中查询其他相关数据(可能来自不同的表)。下面的代码设置了这个后续任务,并按顺序执行 retrieve_task
和 follow_up_task
。
@ray.remote
def follow_up_task(retrieve_result):
original_item, _ = retrieve_result
follow_up_result = retrieve(original_item + 1)
return retrieve_result, follow_up_result
retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]
result = [print(data) for data in ray.get(follow_up_refs)]
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))
如果你不熟悉异步编程,这个示例可能不会特别令人印象深刻。然而,仔细一看,这段代码竟然能够运行,这可能会让你感到惊讶。这段代码看起来像一个普通的 Python 函数,其中包含一些列表推导式。
follow_up_task
的函数体期望其输入参数 retrieve_result
是一个 Python 元组。然而,当你使用命令 [follow_up_task.remote(ref) for ref in retrieve_refs]
时,你传递给后续任务的并不是元组。相反,你使用的是 retrieve_refs
来传递 Ray 对象引用。
在底层,Ray 识别出 follow_up_task
需要实际值,因此它会自动使用 ray.get
函数来解析这些 futures。此外,Ray 会为所有任务创建依赖图,并按照它们的依赖关系执行任务。你无需明确告诉 Ray 何时等待前一个任务完成——它会推断出执行顺序。Ray 对象存储的这一特性非常有用,因为通过将对象引用传递给下一个任务并让 Ray 处理其余部分,可以避免将大量中间值复制回 driver。
流程中的后续步骤只有在专门设计用于检索信息的任务完成后才会被调度。事实上,如果 retrieve_refs
被命名为 retrieve_result
,你可能不会注意到这个关键且有意为之的命名细微差别。Ray 让你能够专注于你的工作,而不是分布式计算的技术细节。这两个任务的依赖图如下所示
Ray Actors(Actor)#
这个示例涵盖了 Ray Core 的另一个重要方面。到目前为止,一切本质上都是函数。你使用了 @ray.remote
装饰器使某些函数成为远程函数,除此之外,你只使用了标准的 Python。
如果你想跟踪数据库被查询的频率,你可以计算 retrieve 任务的结果。但是,有没有更有效的方法呢?理想情况下,你希望以分布式方式跟踪此信息,以便处理大量数据。Ray 提供了一个解决方案,即使用 actor,它可以在集群上运行有状态计算,并且彼此之间也可以通信。与使用装饰函数创建 Ray 任务类似,你可以使用装饰 Python 类来创建 Ray actor。因此,你可以使用 Ray actor 创建一个简单的计数器来跟踪数据库调用次数。
@ray.remote
class DataTracker:
def __init__(self):
self._counts = 0
def increment(self):
self._counts += 1
def counts(self):
return self._counts
当你给 DataTracker 类加上 ray.remote
装饰器时,它就变成了一个 actor。这个 actor 能够跟踪状态,例如计数,并且它的方法是 Ray actor 任务,你可以像调用函数一样使用 .remote()
来调用它们。修改 retrieve_task
以包含这个 actor。
@ray.remote
def retrieve_tracker_task(item, tracker, db):
time.sleep(item / 10.)
tracker.increment.remote()
return item, db[item]
tracker = DataTracker.remote()
object_references = [
retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print(data)
print(ray.get(tracker.counts.remote()))
[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8
正如预期,此计算的结果是 8。尽管你不需要 actor 来执行此计算,但这展示了一种在集群中维护状态的方式,可能涉及多个任务。事实上,你可以将该 actor 传递给任何相关任务,甚至传递给另一个 actor 的构造函数。Ray API 非常灵活,提供了无限的可能性。分布式 Python 工具很少允许有状态计算,这对于运行复杂的分布式算法(如强化学习)特别有用。
总结#
在此示例中,你仅使用了六个 API 方法。其中包括用于初始化集群的 ray.init()
,用于将函数和类转换为任务和 actor 的 @ray.remote
,用于将值传输到 Ray 对象存储的 ray.put()
,以及用于从集群检索对象的 ray.get()
。此外,你还对 actor 方法或任务使用了 .remote()
以在集群上执行代码,以及使用 ray.wait
防止阻塞调用。
Ray API 包含不止这六个调用,但如果你刚开始使用,这六个调用已经很强大了。更概括地总结一下,方法如下所示
ray.init()
: 初始化你的 Ray 集群。传入地址可以连接到现有集群。@ray.remote
: 将函数转换为任务,将类转换为 actor。ray.put()
: 将值放入 Ray 的对象存储。ray.get()
: 从对象存储中获取值。返回你放入的对象或由任务或 actor 计算出的值。.remote()
: 在你的 Ray 集群上运行 actor 方法或任务,并用于实例化 actor。ray.wait()
: 返回两个对象引用列表,一个包含我们正在等待的已完成任务,另一个包含未完成任务。
想了解更多?#
此示例是 “Learning Ray” 一书中 Ray Core 演练的简化版本。如果你喜欢它,请查看Ray Core 示例库或我们用例库中的一些 ML 工作负载。