使用 Ray Jobs CLI 快速入门#

本指南介绍了用于提交和与 Ray 作业交互的 Ray Jobs CLI 命令。

要使用 Python SDK 以程序方式而非 CLI 使用 Jobs API,请参阅Python SDK 概述

设置#

Ray Jobs 在 1.9+ 版本中可用,需要完整安装 Ray。您可以通过运行以下命令安装 Ray:

pip install "ray[default]"

有关安装 Ray 的更多详细信息,请参阅安装指南

要提交作业,您需要向 Ray Cluster 发送 HTTP 请求。本指南假设您正在使用本地 Ray Cluster,您可以通过运行以下命令启动它:

ray start --head
# ...
# 2022-08-10 09:54:57,664   INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...

此命令在本地机器上创建一个 Ray head 节点,可用于开发目的。注意启动或连接 Ray Cluster 时标准输出上出现的 Ray Dashboard URL。稍后使用此 URL 提交作业。有关生产部署场景的更多详细信息,请参阅在 VMsKubernetes 上部署 Ray 的指南。

提交作业#

从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 提交任务并打印其返回值:

# script.py
import ray

@ray.remote
def hello_world():
    return "hello world"

# Automatically connect to the running Ray cluster.
ray.init()
print(ray.get(hello_world.remote()))

创建一个空工作目录,并在其中创建一个名为 script.py 的文件,文件内容为上文所示 Python 脚本。

| your_working_directory
| ├── script.py

接下来,找到您可以提交作业请求的 Ray Cluster 的 HTTP 地址。将作业提交到与 Ray Dashboard 使用的相同地址。默认情况下,此作业使用端口 8265。

如果您正在使用本地 Ray Cluster (ray start --head),可以直接连接到 http://127.0.0.1:8265。如果您使用的是在 VMs 或 Kubernetes 上启动的 Ray Cluster,请按照相关说明设置客户端的网络访问。有关提示,请参阅使用远程集群

要告知 Ray Jobs CLI 如何找到您的 Ray Cluster,请传递 Ray Dashboard 地址。设置 RAY_ADDRESS 环境变量:

$ export RAY_ADDRESS="http://127.0.0.1:8265"

或者,您也可以将 --address=http://127.0.0.1:8265 标志显式传递给每个 Ray Jobs CLI 命令,或在每个命令前加上 RAY_ADDRESS=http://127.0.0.1:8265

此外,如果您希望为每个 HTTP 请求向 Cluster 传递头部信息,请使用 RAY_JOB_HEADERS 环境变量。此环境变量必须采用 JSON 格式。

$ export RAY_JOB_HEADERS='{"KEY": "VALUE"}'

要提交作业,请使用 ray job submit。请确保在 --working-dir 参数中指定工作目录的路径。对于本地集群,此参数并非严格必需,但对于远程集群,此参数是必需的,以便将工作目录上传到集群。

$ ray job submit --working-dir your_working_directory -- python script.py

# Job submission server address: http://127.0.0.1:8265

# -------------------------------------------------------
# Job 'raysubmit_inB2ViQuE29aZRJ5' submitted successfully
# -------------------------------------------------------

# Next steps
#   Query the logs of the job:
#     ray job logs raysubmit_inB2ViQuE29aZRJ5
#   Query the status of the job:
#     ray job status raysubmit_inB2ViQuE29aZRJ5
#   Request the job to be stopped:
#     ray job stop raysubmit_inB2ViQuE29aZRJ5

# Tailing logs until the job exits (disable with --no-wait):
# hello world

# ------------------------------------------
# Job 'raysubmit_inB2ViQuE29aZRJ5' succeeded
# ------------------------------------------

此命令在 Ray Cluster 的 head 节点上运行入口点脚本,并等待作业完成。请注意,它还会将入口点脚本的标准输出 (stdout) 和标准错误 (stderr) 流回客户端(在此示例中为 hello world)。Ray 还会通过将作为 --working-dir 传递的目录内容下载到集群中的所有节点,使其可供 Ray 作业使用。

注意

双破折号 (--) 将入口点命令的参数(例如 python script.py --arg1=val1)与 ray job submit 的参数分开。

注意

默认情况下,入口点脚本在 head 节点上运行。要覆盖此行为,请在 ray job submit 命令中指定 --entrypoint-num-cpus--entrypoint-num-gpus--entrypoint-resources--entrypoint-memory 参数之一。有关更多详细信息,请参阅指定 CPU 和 GPU 资源

与长时间运行的作业交互#

对于长时间运行的应用,您可能不希望客户端等待作业完成。为此,请将 --no-wait 标志传递给 ray job submit,并使用其他 CLI 命令检查作业状态。尝试使用以下修改后的脚本,它会在无限循环中每秒提交一个任务:

# script.py
import ray
import time

@ray.remote
def hello_world():
    return "hello world"

ray.init()
while True:
    print(ray.get(hello_world.remote()))
    time.sleep(1)

现在提交作业:

$ ray job submit --no-wait --working-dir your_working_directory -- python script.py
# Job submission server address: http://127.0.0.1:8265

# -------------------------------------------------------
# Job 'raysubmit_tUAuCKubPAEXh6CW' submitted successfully
# -------------------------------------------------------

# Next steps
#   Query the logs of the job:
#       ray job logs raysubmit_tUAuCKubPAEXh6CW
#   Query the status of the job:
#       ray job status raysubmit_tUAuCKubPAEXh6CW
#   Request the job to be stopped:
#       ray job stop raysubmit_tUAuCKubPAEXh6CW

稍后我们可以使用提供的 ray job logs 命令获取标准输出:

$ ray job logs raysubmit_tUAuCKubPAEXh6CW
# Job submission server address: http://127.0.0.1:8265
# hello world
# hello world
# hello world
# hello world
# hello world

使用 ray job status 获取作业的当前状态:

$ ray job status raysubmit_tUAuCKubPAEXh6CW
# Job submission server address: http://127.0.0.1:8265
# Status for job 'raysubmit_tUAuCKubPAEXh6CW': RUNNING
# Status message: Job is currently running.

最后,要取消作业,请使用 ray job stop

$ ray job stop raysubmit_tUAuCKubPAEXh6CW
# Job submission server address: http://127.0.0.1:8265
# Attempting to stop job raysubmit_tUAuCKubPAEXh6CW
# Waiting for job 'raysubmit_tUAuCKubPAEXh6CW' to exit (disable with --no-wait):
# Job 'raysubmit_tUAuCKubPAEXh6CW' was stopped

$ ray job status raysubmit_tUAuCKubPAEXh6CW
# Job submission server address: http://127.0.0.1:8265
# Job 'raysubmit_tUAuCKubPAEXh6CW' was stopped

使用远程集群#

前面的示例针对本地 Ray 集群。连接到远程集群时,您需要通过 HTTP 访问集群的 dashboard 端口。

一种访问端口的方法是将本地机器上的 127.0.0.1:8265 端口转发到 head 节点上的 127.0.0.1:8265。如果您使用 Ray Cluster Launcher 启动了远程集群,则可以使用 ray dashboard 命令设置自动端口转发。有关详细信息,请参阅监控集群状态 (ray dashboard/status)

在您的本地机器上运行以下命令,其中 cluster.yaml 是用于启动集群的配置文件:

ray dashboard cluster.yaml

此命令运行后,验证您是否可以在本地浏览器中通过 http://127.0.0.1:8265 查看 Ray Dashboard。同时,验证您是否将环境变量 RAY_ADDRESS 设置为 "http://127.0.0.1:8265"。完成此设置后,您可以像前面的示例一样在本地机器上使用 Jobs CLI 与远程 Ray 集群交互。

在 Kubernetes 上使用 CLI#

前面的说明仍然适用,但您可以使用 kubectl port-forward 实现 dashboard 端口转发:https://kubernetes.ac.cn/docs/tasks/access-application-cluster/port-forward-access-application-cluster/

或者,您可以设置 Ingress 以通过 HTTP 访问集群的 dashboard 端口:https://kubernetes.ac.cn/docs/concepts/services-networking/ingress/

依赖管理#

要运行分布式应用,请确保所有 worker 都在相同的环境中运行。如果同一个 Ray Cluster 中的多个应用具有不同且冲突的依赖项,则此配置可能具有挑战性。

为避免依赖冲突,Ray 提供了一种称为运行时环境的机制。运行时环境允许应用覆盖 Ray Cluster 上的默认环境,并在隔离环境中运行,类似于单节点 Python 中的虚拟环境。依赖项可以包括文件和 Python 包。

Ray Jobs API 提供了一个选项,用于在提交作业时指定运行时环境。在 Ray Cluster 上,Ray 会跨 worker 安装运行时环境,并确保该作业中的任务在相同的环境中运行。为了演示此功能,此 Python 脚本在 Ray 任务中打印 requests 模块的当前版本。

import ray
import requests

@ray.remote
def get_requests_version():
    return requests.__version__

# Note: No need to specify the runtime_env in ray.init() in the driver script.
ray.init()
print("requests version:", ray.get(get_requests_version.remote()))

使用默认环境提交此作业。此环境即您启动 Ray Cluster 时所在的环境。

$ ray job submit -- python script.py
# Job submission server address: http://127.0.0.1:8265
#
# -------------------------------------------------------
# Job 'raysubmit_seQk3L4nYWcUBwXD' submitted successfully
# -------------------------------------------------------
#
# Next steps
#   Query the logs of the job:
#     ray job logs raysubmit_seQk3L4nYWcUBwXD
#   Query the status of the job:
#     ray job status raysubmit_seQk3L4nYWcUBwXD
#   Request the job to be stopped:
#     ray job stop raysubmit_seQk3L4nYWcUBwXD
#
# Tailing logs until the job exits (disable with --no-wait):
# requests version: 2.28.1
#
# ------------------------------------------
# Job 'raysubmit_seQk3L4nYWcUBwXD' succeeded
# ------------------------------------------

现在使用一个指定 requests 模块版本的运行时环境提交作业:

$ ray job submit --runtime-env-json='{"pip": ["requests==2.26.0"]}' -- python script.py
# Job submission server address: http://127.0.0.1:8265

# -------------------------------------------------------
# Job 'raysubmit_vGGV4MiP9rYkYUnb' submitted successfully
# -------------------------------------------------------

# Next steps
#   Query the logs of the job:
#     ray job logs raysubmit_vGGV4MiP9rYkYUnb
#   Query the status of the job:
#     ray job status raysubmit_vGGV4MiP9rYkYUnb
#   Request the job to be stopped:
#     ray job stop raysubmit_vGGV4MiP9rYkYUnb

# Tailing logs until the job exits (disable with --no-wait):
# requests version: 2.26.0

# ------------------------------------------
# Job 'raysubmit_vGGV4MiP9rYkYUnb' succeeded
# ------------------------------------------

注意

如果 Driver 和作业都指定了运行时环境,Ray 会尝试合并它们并在发生冲突时引发异常。有关更多详细信息,请参阅运行时环境