常见问题#

分布式应用提供了强大的能力,但也增加了复杂性。Ray 的一些行为可能最初会让用户感到惊讶,但这些设计选择在分布式计算环境中服务于重要的目的。

本文档概述了在集群中运行 Ray 时遇到的常见问题,重点强调了与在本地运行 Ray 的关键差异。

环境变量不会从 Driver 进程传递给 Worker 进程#

问题: 当你在 Driver 上设置环境变量时,它不会传播到 Worker 进程。

示例: 假设你在运行 Ray 的目录中有一个文件 baz.py,并且执行以下命令

import ray
import os

ray.init()


@ray.remote
def myfunc():
    myenv = os.environ.get("FOO")
    print(f"myenv is {myenv}")
    return 1


ray.get(myfunc.remote())
# this prints: "myenv is None"

预期行为: 用户可能期望在 Driver 上设置环境变量会将其发送给所有 Worker 进程,就像在单机上运行一样,但实际上并非如此。

解决方法: 启用运行时环境 (Runtime Environments) 来明确传递环境变量。当你调用 ray.init(runtime_env=...) 时,它会将指定的环境变量发送给 Workers。或者,你可以将环境变量设置为集群设置配置的一部分。

ray.init(runtime_env={"env_vars": {"FOO": "bar"}})


@ray.remote
def myfunc():
    myenv = os.environ.get("FOO")
    print(f"myenv is {myenv}")
    return 1


ray.get(myfunc.remote())
# this prints: "myenv is bar"

文件名有时能用,有时不能用#

问题: 在 Task 或 Actor 中通过文件名引用文件有时会成功,有时会失败。这种不一致性出现的原因是 Task 或 Actor 在 Head 节点上运行时能找到文件,但在其他机器上可能不存在该文件。

示例: 考虑以下场景

% touch /tmp/foo.txt

以及这段代码

import os
import ray

@ray.remote
def check_file():
    foo_exists = os.path.exists("/tmp/foo.txt")
    return foo_exists

futures = []
for _ in range(1000):
    futures.append(check_file.remote())

print(ray.get(futures))

在这种情况下,你可能会收到 True 和 False 的混合结果。如果 check_file() 在 Head 节点或本地运行,它会找到文件;但在 Worker 节点上则找不到。

预期行为: 用户通常期望文件引用要么始终有效,要么始终失败,而不是表现出不一致的行为。

解决方法

— 对于此类应用,只使用共享文件路径。例如,网络文件系统或 S3 存储可以提供所需的一致性。— 避免依赖本地文件在机器之间保持一致。

安置组 (Placement Groups) 不具备可组合性#

问题: 如果你从在安置组 (Placement Group) 内运行的任务或 Actor 中调度新任务,系统可能无法正确分配资源,导致操作挂起。

示例: 想象一下你正在使用 Ray Tune (它会创建安置组),并希望将其应用于一个目标函数,而该目标函数又使用了 Ray Tasks。例如

import ray
from ray import tune
from ray.util.placement_group import PlacementGroupSchedulingStrategy

def create_task_that_uses_resources():
    @ray.remote(num_cpus=10)
    def sample_task():
        print("Hello")
        return

    return ray.get([sample_task.remote() for i in range(10)])

def objective(config):
    create_task_that_uses_resources()

tuner = tune.Tuner(objective, param_space={"a": 1})
tuner.fit()

这段代码会报错,提示信息如下:

ValueError: Cannot schedule create_task_that_uses_resources.<locals>.sample_task with the placement group
because the resource request {'CPU': 10} cannot fit into any bundles for the placement group, [{'CPU': 1.0}].

预期行为: 代码成功执行,没有资源分配问题。

解决方法: 确保在 create_task_that_uses_resources() 内调用的任务的 @ray.remote 声明中,包含参数 scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=None)

def create_task_that_uses_resources():
+     @ray.remote(num_cpus=10, scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=None))
-     @ray.remote(num_cpus=10)

过时的函数定义#

由于 Python 的微妙之处,重新定义远程函数可能不会总是更新 Ray 以使用最新版本。例如,假设你定义了一个远程函数 f,然后重新定义它;Ray 应该使用新的定义。

import ray

@ray.remote
def f():
    return 1

@ray.remote
def f():
    return 2

print(ray.get(f.remote()))  # This should print 2.
2

然而,在某些情况下,修改远程函数需要重启集群才能生效。

导入函数问题: 如果 f 定义在外部文件 (例如,file.py) 中,并且你修改了它的定义,再次导入该文件可能会被忽略,因为 Python 将随后的导入视为无操作 (no-op)。一个解决方法是使用 from importlib import reload; reload(file) 来代替第二次导入。

辅助函数依赖: 如果 f 依赖于定义在外部文件中的辅助函数 h,对 h 的更改可能不会传播。最简单的解决方案是重启 Ray 集群。或者,你可以重新定义 f,使其在调用 h 之前重新加载 file.py

@ray.remote
def f():
    from importlib import reload
    reload(file)
    return file.h()

这将强制外部模块在 Workers 上重新加载。请注意,在 Python 3 中,你必须使用 from importlib import reload

捕获任务和 Actor 调用位置#

当你调用任务、创建 Actor 或调用 Actor 方法时,Ray 会捕获并显示堆栈跟踪。

要启用调用位置捕获,设置环境变量 RAY_record_task_actor_creation_sites=true。启用后

— Ray 在创建任务、Actor 或调用 Actor 方法时捕获堆栈跟踪。— 捕获的堆栈跟踪可在 Ray Dashboard(在任务和 Actor 详细信息下)、状态 CLI 命令 ray list task --detail 的输出以及状态 API 响应中查看。

请注意,Ray 默认关闭堆栈跟踪捕获,因为这可能影响性能。仅在调试需要时启用它。

示例

import ray

# Enable stack trace capture
ray.init(runtime_env={"env_vars": {"RAY_record_task_actor_creation_sites": "true"}})

@ray.remote
def my_task():
    return 42

# Capture the stack trace upon task invocation.
future = my_task.remote()
result = ray.get(future)

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

# Capture the stack trace upon actor creation.
counter = Counter.remote()

# Capture the stack trace upon method invocation.
counter.increment.remote()

本文档概述了使用 Ray 时遇到的常见问题以及潜在的解决方案。如果你遇到其他问题,请报告。