什么是 Ray Core?#

Ray Core 是一个强大的分布式计算框架,它提供了一小套基本原语(任务、Actor 和对象),用于构建和扩展分布式应用。本教程通过简单的示例向您介绍这些核心概念,演示如何将 Python 函数和类转换为分布式的 Ray 任务和 Actor,以及如何有效地处理 Ray 对象。

注意

Ray 为高性能工作负载引入了一个实验性 API,特别适用于使用多个 GPU 的应用。有关详细信息,请参阅 Ray 编译图

入门#

要开始使用,请使用 pip install -U ray 安装 Ray。有关更多安装选项,请参阅 安装 Ray

第一步是导入和初始化 Ray

import ray

ray.init()

注意

在 Ray 的最新版本(>=1.5)中,首次使用 Ray 远程 API 时会自动调用 ray.init()

运行任务#

任务是跨 Ray 集群并行化 Python 函数的最简单方法。要创建一个任务

  1. 使用 @ray.remote 装饰你的函数,表明它应该远程运行

  2. 使用 .remote() 调用函数,而不是正常的函数调用

  3. 使用 ray.get() 从返回的 future(Ray 对象引用)中检索结果

这里有一个简单的例子

# Define the square task.
@ray.remote
def square(x):
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]

调用 Actor#

任务是无状态的,而 Ray Actor 允许你创建有状态的工作进程,这些进程在方法调用之间保持其内部状态。当你实例化一个 Ray Actor 时

  1. Ray 会在集群中的某个地方启动一个专用的工作进程

  2. 该 Actor 的方法在该特定工作进程上运行,可以访问和修改其状态

  3. 该 Actor 按接收顺序串行执行方法调用,保持一致性

这里有一个简单的计数器示例

# Define the Counter actor.
@ray.remote
class Counter:
    def __init__(self):
        self.i = 0

    def get(self):
        return self.i

    def incr(self, value):
        self.i += value

# Create a Counter actor.
c = Counter.remote()

# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):
    c.incr.remote(1)

# Retrieve final actor state.
print(ray.get(c.get.remote()))
# -> 10

上面的示例演示了基本的 Actor 用法。有关结合任务和 Actor 的更全面示例,请参阅 Monte Carlo Pi 估算示例

传递对象#

Ray 的分布式对象存储有效地管理集群中的数据。在 Ray 中处理对象主要有三种方式

  1. 隐式创建:当任务和 Actor 返回值时,它们会自动存储在 Ray 的分布式对象存储中,返回 对象引用,这些引用以后可以用来检索值。

  2. 显式创建:使用 ray.put() 直接将对象放入存储中。

  3. 传递引用:你可以将对象引用传递给其他任务和 Actor,避免不必要的数据复制并启用惰性执行。

这里有一个示例展示这些技术

import numpy as np

# Define a task that sums the values in a matrix.
@ray.remote
def sum_matrix(matrix):
    return np.sum(matrix)

# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0

# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))

# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))
# -> 1000000.0

后续步骤#

提示

要监控应用的性能和资源使用情况,请查看 Ray dashboard

你可以通过强大的方式组合 Ray 的简单原语,以表达几乎任何分布式计算模式。要深入了解 Ray 的关键概念,请浏览这些用户指南