Ray Core 是什么?#

Ray Core 是一个强大的分布式计算框架,它提供了一组简洁的核心原语(任务、Actor 和对象),用于构建和扩展分布式应用程序。本教程将通过简单的示例介绍这些核心概念,展示如何将 Python 函数和类转换为分布式 Ray 任务和 Actor,以及如何有效地使用 Ray 对象。

注意

Ray 引入了一个实验性的 API,用于高性能工作负载,特别适用于使用多个 GPU 的应用程序。有关更多详细信息,请参阅 Ray 编译图

入门#

要开始使用,请使用 pip install -U ray 安装 Ray。有关其他安装选项,请参阅 安装 Ray

第一步是导入并初始化 Ray

import ray

ray.init()

注意

除非您显式调用 ray.init(),否则 Ray 远程 API 的首次调用将隐式地调用不带参数的 ray.init()

运行任务#

任务是跨 Ray 集群并行化 Python 函数的最简单方法。要创建任务:

  1. 使用 @ray.remote 装饰您的函数,以指示它应该远程运行

  2. 使用 .remote() 调用函数,而不是常规函数调用

  3. 使用 ray.get() 从返回的 Future(Ray *对象引用*)中检索结果

这是一个简单的示例

# Define the square task.
@ray.remote
def square(x):
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]

调用 Actor#

虽然任务是无状态的,但 Ray Actor 允许您创建有状态的 Worker,它们可以在方法调用之间保持其内部状态。当您实例化一个 Ray Actor 时:

  1. Ray 会在您的集群中的某个位置启动一个专用的 Worker 进程

  2. Actor 的方法在该特定 Worker 上运行,并可以访问和修改其状态

  3. Actor 按接收顺序串行执行方法调用,以保持一致性

这是一个简单的 Counter 示例

# Define the Counter actor.
@ray.remote
class Counter:
    def __init__(self):
        self.i = 0

    def get(self):
        return self.i

    def incr(self, value):
        self.i += value

# Create a Counter actor.
c = Counter.remote()

# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):
    c.incr.remote(1)

# Retrieve final actor state.
print(ray.get(c.get.remote()))
# -> 10

前面的示例演示了 Actor 的基本用法。有关结合任务和 Actor 的更全面的示例,请参阅 蒙特卡洛 Pi 估算示例

传递对象#

Ray 的分布式对象存储高效地管理着整个集群中的数据。有三种主要的方法可以处理 Ray 中的对象:

  1. 隐式创建:当任务和 Actor 返回值时,它们会自动存储在 Ray 的 分布式对象存储 中,并返回可以稍后检索的 *对象引用*。

  2. 显式创建:使用 ray.put() 直接将对象放入存储中。

  3. 传递引用:您可以将对象引用传递给其他任务和 Actor,从而避免不必要的数据复制并实现惰性计算。

这是一个展示这些技术的示例

import numpy as np

# Define a task that sums the values in a matrix.
@ray.remote
def sum_matrix(matrix):
    return np.sum(matrix)

# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0

# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))

# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))
# -> 1000000.0

后续步骤#

提示

要监控应用程序的性能和资源使用情况,请查看 Ray 仪表板

您可以以强大的方式组合 Ray 的简单原语来表达几乎任何分布式计算模式。要深入了解 Ray 的 核心概念,请探索这些用户指南: