Ray Client#

警告

Ray Client 需要 pip 包 ray[client]。如果您安装了最小版本的 Ray(例如 pip install ray),请执行 pip install ray[client] 重新安装。

什么是 Ray Client?

Ray Client 是一个 API,用于将 Python 脚本连接到远程 Ray 集群。实际上,它允许您像在本地机器上运行 Ray 一样利用远程 Ray 集群。

通过将 ray.init() 更改为 ray.init("ray://<head_node_host>:<port>"),您可以直接从您的笔记本电脑(或任何地方)连接到远程集群并扩展您的 Ray 代码,同时保留在 Python shell 中进行交互式开发的能力。这仅适用于 Ray 1.5+。

# You can run this code outside of the Ray cluster!
import ray

# Starting the Ray client. This connects to a remote Ray cluster.
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)
#....

何时使用 Ray Client#

注意

Ray Client 存在架构限制,在用于 ML 工作负载(如 Ray Tune 或 Ray Train)时可能无法按预期工作。对于 ML 项目的交互式开发,请使用 Ray Jobs API

当您想将交互式 Python shell 连接到远程集群时,可以使用 Ray Client。

  • 如果您已经在 <head_node_host> 设置了远程集群并且想要进行交互式工作,请使用 ray.init("ray://<head_node_host>:10001") (Ray Client)。这会将您的 shell 连接到集群。有关设置集群的更多详细信息,请参阅 使用 Ray Client 部分。

  • 如果您正在本地开发并想连接到现有集群(即已经运行了 ray start --head),或者自动创建一个本地集群并直接连接到它,请使用 ray.init()(非客户端连接,未指定地址)。这也可用于 Ray Job 提交。

Ray Client 对于在本地 Python shell 中进行交互式开发非常有用。但是,它需要与远程集群建立稳定的连接,如果连接丢失超过 30 秒,它将终止工作负载。如果您想在集群上运行长时间运行的工作负载,我们建议使用 Ray Jobs

客户端参数#

当传递给 ray.init 的地址以 ray:// 开头时,将使用 Ray Client。除了地址之外,客户端模式当前接受另外两个参数

  • namespace (可选):为会话设置命名空间。

  • runtime_env (可选):为会话设置 运行时环境,允许您动态指定环境变量、包、本地文件等。

# Connects to an existing cluster at 1.2.3.4 listening on port 10001, using
# the namespace "my_namespace". The Ray workers will run inside a cluster-side
# copy of the local directory "files/my_project", in a Python environment with
# `toolz` and `requests` installed.
ray.init(
    "ray://1.2.3.4:10001",
    namespace="my_namespace",
    runtime_env={"working_dir": "files/my_project", "pip": ["toolz", "requests"]},
)
#....

如何使用 Ray Client?#

步骤 1:设置您的 Ray 集群#

如果您有正在运行的 Ray 集群(版本 >= 1.5),Ray Client 服务器默认可能已在头节点端口 10001 上运行。否则,您需要创建一个 Ray 集群。要在本地启动 Ray 集群,您可以运行

ray start --head

要在远程启动 Ray 集群,您可以按照 入门 中的说明进行操作。

如有必要,您可以通过在 ray start 命令 中指定 --ray-client-server-port=... 来修改 Ray Client 服务器端口,使其不是 10001

步骤 2:配置访问#

确保您的本地机器可以访问头节点上的 Ray Client 端口。

实现此目的的最简单方法是使用 SSH 端口转发或 K8s 端口转发。这使您可以通过 localhost 连接到头节点上的 Ray Client 服务器。

首先,与您的 Ray 集群建立 SSH 连接并转发监听端口 (10001)。对于使用 Ray Cluster 启动器启动的集群,这看起来像

$ ray up cluster.yaml
$ ray attach cluster.yaml -p 10001

然后从另一个终端使用 localhost 作为 head_node_host 连接到 Ray 集群。

import ray

# This will connect to the cluster via the open SSH session.
ray.init("ray://localhost:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

步骤 3:运行 Ray 代码#

现在,使用以下命令连接到 Ray 集群,然后像往常一样使用 Ray

import ray

# replace with the appropriate host and port
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

替代连接方法:#

除了端口转发,如果您的计算机能够访问头节点的网络,也可以直接连接到头节点上的 Ray Client 服务器。如果您的计算机与集群在同一网络中,或者可以通过 VPN 连接到集群,则可以选择此方法。

如果您的计算机没有直接访问权限,可以修改网络配置来授予访问权限。在 EC2 上,可以通过修改安全组来允许从您的本地 IP 地址到 Ray Client 服务器端口(默认为 10001)的入站访问。

使用 Ray 集群启动器,您可以通过在 cluster.yaml 中定义 provider.security_group 来配置安全组以允许入站访问。

# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal_security_group

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    security_group:
        GroupName: ray_client_security_group
        IpPermissions:
              - FromPort: 10001
                ToPort: 10001
                IpProtocol: TCP
                IpRanges:
                    # Allow traffic only from your local IP address.
                    - CidrIp: <YOUR_IP_ADDRESS>/32

警告

任何拥有 Ray Client 访问权限的人都可以在 Ray 集群上执行任意代码。

请勿将其暴露给 0.0.0.0/0

连接到多个 Ray 集群 (实验性)#

Ray Client 允许在一个 Python 进程中连接到多个 Ray 集群。要实现这一点,只需将 allow_multiple=True 传递给 ray.init

import ray
# Create a default client.
ray.init("ray://<head_node_host_cluster>:10001")

# Connect to other clusters.
cli1 = ray.init("ray://<head_node_host_cluster_1>:10001", allow_multiple=True)
cli2 = ray.init("ray://<head_node_host_cluster_2>:10001", allow_multiple=True)

# Data is put into the default cluster.
obj = ray.put("obj")

with cli1:
    obj1 = ray.put("obj1")

with cli2:
    obj2 = ray.put("obj2")

with cli1:
    assert ray.get(obj1) == "obj1"
    try:
        ray.get(obj2)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")

with cli2:
    assert ray.get(obj2) == "obj2"
    try:
        ray.get(obj1)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")
assert "obj" == ray.get(obj)
cli1.disconnect()
cli2.disconnect()

使用 Ray 多客户端时,需要注意一些不同的行为

  • 客户端不会自动断开连接。请显式调用 disconnect 来关闭连接。

  • 对象引用只能由获取它的客户端使用。

  • 没有 allow_multipleray.init 将创建一个默认的全局 Ray 客户端。

需要了解的事项#

客户端断开连接#

当客户端断开连接时,服务器代表客户端持有的任何对象或 actor 引用都会被丢弃,就像直接从集群断开连接一样。

如果客户端意外断开连接(例如由于网络故障),客户端将尝试重新连接服务器 30 秒,之后所有引用将被丢弃。您可以通过设置环境变量 RAY_CLIENT_RECONNECT_GRACE_PERIOD=N 来增加此时间,其中 N 是客户端在放弃之前尝试重新连接的秒数。

版本要求#

通常,客户端 Ray 版本必须与服务器 Ray 版本匹配。如果使用了不兼容的版本,将引发错误。

类似地,客户端和服务器之间的次要 Python 版本(例如,3.6 对比 3.7)必须匹配。如果情况并非如此,将引发错误。

在旧版本 Ray 上启动连接#

如果在使用 ray.init("ray://...") 时遇到 socket.gaierror: [Errno -2] Name or service not known 错误,则您可能使用的是 Ray 1.5 之前的版本,该版本不支持通过 ray.init 启动客户端连接。

通过 Ingress 连接#

如果您在使用 Ingress 连接到 Ray Cluster 时遇到以下错误消息,这可能是由 Ingress 的配置引起的。

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164591000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>
Got Error from logger channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164713000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>

如果您使用的是 nginx-ingress-controller,您可以通过添加以下 Ingress 配置来解决此问题。

metadata:
  annotations:
     nginx.ingress.kubernetes.io/server-snippet: |
       underscores_in_headers on;
       ignore_invalid_headers on;

Ray 客户端日志#

Ray 客户端日志可以在头节点上的 /tmp/ray/session_latest/logs 找到。

上传#

如果在运行时环境中指定了 working_dir,当运行 ray.init() 时,Ray 客户端会将笔记本电脑上的 working_dir 上传到 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<hash of directory contents>

Ray worker 会在集群上的 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<hash of directory contents> 目录中启动。这意味着代码中远程任务和 actor 的相对路径在笔记本电脑和集群上都可以工作,无需任何代码更改。例如,如果笔记本电脑上的 working_dir 包含 data.txtrun.py,则在 run.py 中的远程任务定义中,只需使用相对路径 "data.txt" 即可。这样,python run.py 可以在我的笔记本电脑上工作,也可以在集群上工作。顺带一提,由于代码中可以使用相对路径,绝对路径仅对调试有用。

故障排除#

错误:尝试重新连接一个已清理的会话#

当 Ray Client 重新连接到无法识别该客户端的头节点时,会发生此错误。如果头节点意外重启并丢失状态,可能会发生这种情况。在 Kubernetes 上,如果头 Pod 在被驱逐或崩溃后重新启动,可能会发生这种情况。