Ray Client#

警告

Ray Client 需要 pip 包 ray[client]。如果您安装的是最小化的 Ray(例如 pip install ray),请执行 pip install ray[client] 重新安装。

什么是 Ray Client?

Ray Client 是一个 API,用于将 Python 脚本连接到**远程** Ray 集群。它能让您像在本地机器上运行 Ray 一样,充分利用远程 Ray 集群。

通过将 ray.init() 更改为 ray.init("ray://<head_node_host>:<port>"),您可以直接从笔记本电脑(或任何地方)连接到远程集群并扩展您的 Ray 代码,同时还能在 Python shell 中进行交互式开发。**这仅适用于 Ray 1.5+。**

# You can run this code outside of the Ray cluster!
import ray

# Starting the Ray client. This connects to a remote Ray cluster.
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)
#....

何时使用 Ray Client#

注意

Ray Client 存在架构限制,在使用 Ray 进行 ML 工作负载(如 Ray Tune 或 Ray Train)时可能无法按预期工作。对于 ML 项目的交互式开发,请使用 Ray Jobs API

当您想将交互式 Python shell 连接到**远程**集群时,可以使用 Ray Client。

  • 如果您已经在 <head_node_host> 上设置了远程集群,并希望进行交互式工作,请使用 ray.init("ray://<head_node_host>:10001")(Ray Client)。这将使您的 shell 连接到集群。有关设置集群的更多详细信息,请参阅 使用 Ray Client 部分。

  • 如果您正在本地开发并希望连接到现有集群(即已运行 ray start --head),或者自动创建本地集群并直接连接到它,请使用 ray.init()(非客户端连接,未指定地址)。这也可以用于 Ray Job 提交。

Ray Client 对于在本地 Python shell 中进行交互式开发非常有用。但是,它需要与远程集群保持稳定的连接,如果连接丢失超过 30 秒,工作负载将终止。如果您有一个希望在集群上运行的长时间运行的工作负载,我们建议改用 Ray Jobs

Client 参数#

当传递到 ray.init 的地址以 ray:// 开头时,就使用了 Ray Client。除了地址之外,Client 模式目前还接受两个其他参数:

  • namespace(可选):设置会话的命名空间。

  • runtime_env(可选):设置会话的 运行时环境,允许您动态指定环境变量、包、本地文件等。

# Connects to an existing cluster at 1.2.3.4 listening on port 10001, using
# the namespace "my_namespace". The Ray workers will run inside a cluster-side
# copy of the local directory "files/my_project", in a Python environment with
# `toolz` and `requests` installed.
ray.init(
    "ray://1.2.3.4:10001",
    namespace="my_namespace",
    runtime_env={"working_dir": "files/my_project", "pip": ["toolz", "requests"]},
)
#....

如何使用 Ray Client?#

步骤 1:设置您的 Ray 集群#

如果您有一个正在运行的 Ray 集群(版本 >= 1.5),Ray Client 服务器很可能默认已经在主节点的 10001 端口上运行。否则,您需要创建一个 Ray 集群。要本地启动 Ray 集群,您可以运行:

ray start --head

要远程启动 Ray 集群,您可以遵循 入门指南 中的说明。

如果需要,您可以通过指定 --ray-client-server-port=...ray start 命令,将 Ray Client 服务器端口更改为非 10001

步骤 2:配置访问#

确保您的本地机器可以访问主节点上的 Ray Client 端口。

最简单的方法是使用 SSH 端口转发或 K8s 端口转发。这使您可以通过 localhost 连接到主节点上的 Ray Client 服务器。

首先,打开一个到 Ray 集群的 SSH 连接,并转发监听端口(10001)。对于使用 Ray Cluster launcher 启动的集群,操作如下:

$ ray up cluster.yaml
$ ray attach cluster.yaml -p 10001

然后,**从另一个终端**使用 localhost 作为 head_node_host 连接到 Ray 集群。

import ray

# This will connect to the cluster via the open SSH session.
ray.init("ray://:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

步骤 3:运行 Ray 代码#

现在,使用以下命令连接到 Ray 集群,然后像往常一样使用 Ray:

import ray

# replace with the appropriate host and port
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

替代连接方法:#

与端口转发不同,如果您的计算机能够通过网络访问主节点,您可以直接连接到主节点上的 Ray Client 服务器。如果您的计算机与集群在同一网络中,或者可以通过 VPN 连接到集群,则这是个可行的选项。

如果您的计算机没有直接访问权限,您可以修改网络配置以授予访问权限。在 EC2 上,可以通过修改安全组,允许从您的本地 IP 地址到 Ray Client 服务器端口(默认为 10001)的入站访问来完成。

通过 Ray 集群启动器,您可以通过在 cluster.yaml 文件中定义 provider.security_group 来配置安全组以允许入站访问。

# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal_security_group

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    security_group:
        GroupName: ray_client_security_group
        IpPermissions:
              - FromPort: 10001
                ToPort: 10001
                IpProtocol: TCP
                IpRanges:
                    # Allow traffic only from your local IP address.
                    - CidrIp: <YOUR_IP_ADDRESS>/32

警告

任何拥有 Ray Client 访问权限的人都可以执行任意代码在 Ray 集群上。

请勿将其暴露给 `0.0.0.0/0`。

连接到多个 Ray 集群(实验性)#

Ray Client 允许在一个 Python 进程中连接到多个 Ray 集群。要做到这一点,只需将 allow_multiple=True 传递给 ray.init

import ray
# Create a default client.
ray.init("ray://<head_node_host_cluster>:10001")

# Connect to other clusters.
cli1 = ray.init("ray://<head_node_host_cluster_1>:10001", allow_multiple=True)
cli2 = ray.init("ray://<head_node_host_cluster_2>:10001", allow_multiple=True)

# Data is put into the default cluster.
obj = ray.put("obj")

with cli1:
    obj1 = ray.put("obj1")

with cli2:
    obj2 = ray.put("obj2")

with cli1:
    assert ray.get(obj1) == "obj1"
    try:
        ray.get(obj2)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")

with cli2:
    assert ray.get(obj2) == "obj2"
    try:
        ray.get(obj1)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")
assert "obj" == ray.get(obj)
cli1.disconnect()
cli2.disconnect()

使用 Ray 多客户端时,有几点不同的行为需要注意:

  • 客户端不会自动断开连接。请显式调用 disconnect 来关闭连接。

  • 对象引用只能由获取它的客户端使用。

  • 不带 allow_multipleray.init 将创建一个默认的全局 Ray 客户端。

须知#

客户端断开连接#

当客户端断开连接时,服务器代表客户端持有的任何对象或 Actor 引用都将被丢弃,就如同直接从集群断开连接一样。

如果客户端意外断开连接(例如,由于网络故障),客户端将尝试重新连接服务器 30 秒,然后所有引用才会被丢弃。您可以通过设置环境变量 RAY_CLIENT_RECONNECT_GRACE_PERIOD=N 来增加此时间,其中 N 是客户端在放弃之前尝试重新连接的秒数。

版本要求#

通常,客户端 Ray 版本必须与服务器 Ray 版本匹配。如果使用不兼容的版本,将引发错误。

同样,客户端和服务器之间的次要 Python 版本(例如 3.6 vs 3.7)必须匹配。如果不匹配,将引发错误。

在旧版 Ray 上启动连接#

如果您在使用 ray.init("ray://...") 时遇到 socket.gaierror: [Errno -2] Name or service not known 错误,则您可能使用的是 Ray 1.5 之前的版本,该版本不支持通过 ray.init 启动客户端连接。

通过 Ingress 进行连接#

如果您在使用 Ingress 连接到 Ray Cluster 时遇到以下错误消息,这可能由 Ingress 的配置引起。

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164591000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>
Got Error from logger channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164713000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>

如果您使用的是 nginx-ingress-controller,您可以通过添加以下 Ingress 配置来解决问题。

metadata:
  annotations:
     nginx.ingress.kubernetes.io/server-snippet: |
       underscores_in_headers on;
       ignore_invalid_headers on;

Ray Client 日志#

Ray Client 日志可以在主节点的 /tmp/ray/session_latest/logs 找到。

上传#

如果在运行时环境中指定了 working_dir,在运行 ray.init() 时,Ray Client 会将笔记本电脑上的 working_dir 上传到 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<directory 内容 hash>

Ray 工作进程将在集群的 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<directory 内容 hash> 目录中启动。这意味着,远程任务和 Actor 中的相对路径在笔记本电脑和集群上都可以工作,无需任何代码更改。例如,如果笔记本电脑上的 working_dir 包含 data.txtrun.py,那么在 run.py 的远程任务定义中,您可以直接使用相对路径 "data.txt"。然后 python run.py 将在您的笔记本电脑上运行,在集群上也能运行。顺带一提,由于代码中可以使用相对路径,绝对路径仅用于调试目的。

故障排除#

错误:尝试重新连接已清理会话#

当 Ray Client 重新连接到一个不识别该客户端的主节点时,就会发生此错误。如果主节点意外重启并丢失状态,可能会发生这种情况。在 Kubernetes 上,如果主 Pod 在被驱逐或崩溃后重启,也可能发生这种情况。